async_producer_test.go 34 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. const TestMessage = "ABC THE MESSAGE"
  12. func closeProducer(t *testing.T, p AsyncProducer) {
  13. var wg sync.WaitGroup
  14. p.AsyncClose()
  15. wg.Add(2)
  16. go func() {
  17. for range p.Successes() {
  18. t.Error("Unexpected message on Successes()")
  19. }
  20. wg.Done()
  21. }()
  22. go func() {
  23. for msg := range p.Errors() {
  24. t.Error(msg.Err)
  25. }
  26. wg.Done()
  27. }()
  28. wg.Wait()
  29. }
  30. func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
  31. expect := successes + errors
  32. for expect > 0 {
  33. select {
  34. case msg := <-p.Errors():
  35. if msg.Msg.flags != 0 {
  36. t.Error("Message had flags set")
  37. }
  38. errors--
  39. expect--
  40. if errors < 0 {
  41. t.Error(msg.Err)
  42. }
  43. case msg := <-p.Successes():
  44. if msg.flags != 0 {
  45. t.Error("Message had flags set")
  46. }
  47. successes--
  48. expect--
  49. if successes < 0 {
  50. t.Error("Too many successes")
  51. }
  52. }
  53. }
  54. if successes != 0 || errors != 0 {
  55. t.Error("Unexpected successes", successes, "or errors", errors)
  56. }
  57. }
  58. type testPartitioner chan *int32
  59. func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
  60. part := <-p
  61. if part == nil {
  62. return 0, errors.New("BOOM")
  63. }
  64. return *part, nil
  65. }
  66. func (p testPartitioner) RequiresConsistency() bool {
  67. return true
  68. }
  69. func (p testPartitioner) feed(partition int32) {
  70. p <- &partition
  71. }
  72. type flakyEncoder bool
  73. func (f flakyEncoder) Length() int {
  74. return len(TestMessage)
  75. }
  76. func (f flakyEncoder) Encode() ([]byte, error) {
  77. if !bool(f) {
  78. return nil, errors.New("flaky encoding error")
  79. }
  80. return []byte(TestMessage), nil
  81. }
  82. func TestAsyncProducer(t *testing.T) {
  83. seedBroker := NewMockBroker(t, 1)
  84. leader := NewMockBroker(t, 2)
  85. metadataResponse := new(MetadataResponse)
  86. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  87. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  88. seedBroker.Returns(metadataResponse)
  89. prodSuccess := new(ProduceResponse)
  90. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  91. leader.Returns(prodSuccess)
  92. config := NewConfig()
  93. config.Producer.Flush.Messages = 10
  94. config.Producer.Return.Successes = true
  95. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  96. if err != nil {
  97. t.Fatal(err)
  98. }
  99. for i := 0; i < 10; i++ {
  100. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  101. }
  102. for i := 0; i < 10; i++ {
  103. select {
  104. case msg := <-producer.Errors():
  105. t.Error(msg.Err)
  106. if msg.Msg.flags != 0 {
  107. t.Error("Message had flags set")
  108. }
  109. case msg := <-producer.Successes():
  110. if msg.flags != 0 {
  111. t.Error("Message had flags set")
  112. }
  113. if msg.Metadata.(int) != i {
  114. t.Error("Message metadata did not match")
  115. }
  116. case <-time.After(time.Second):
  117. t.Errorf("Timeout waiting for msg #%d", i)
  118. goto done
  119. }
  120. }
  121. done:
  122. closeProducer(t, producer)
  123. leader.Close()
  124. seedBroker.Close()
  125. }
  126. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  127. seedBroker := NewMockBroker(t, 1)
  128. leader := NewMockBroker(t, 2)
  129. metadataResponse := new(MetadataResponse)
  130. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  131. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  132. seedBroker.Returns(metadataResponse)
  133. prodSuccess := new(ProduceResponse)
  134. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  135. leader.Returns(prodSuccess)
  136. leader.Returns(prodSuccess)
  137. leader.Returns(prodSuccess)
  138. config := NewConfig()
  139. config.Producer.Flush.Messages = 5
  140. config.Producer.Return.Successes = true
  141. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. for flush := 0; flush < 3; flush++ {
  146. for i := 0; i < 5; i++ {
  147. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  148. }
  149. expectResults(t, producer, 5, 0)
  150. }
  151. closeProducer(t, producer)
  152. leader.Close()
  153. seedBroker.Close()
  154. }
  155. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  156. seedBroker := NewMockBroker(t, 1)
  157. leader0 := NewMockBroker(t, 2)
  158. leader1 := NewMockBroker(t, 3)
  159. metadataResponse := new(MetadataResponse)
  160. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  161. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  162. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  163. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  164. seedBroker.Returns(metadataResponse)
  165. prodResponse0 := new(ProduceResponse)
  166. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  167. leader0.Returns(prodResponse0)
  168. prodResponse1 := new(ProduceResponse)
  169. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  170. leader1.Returns(prodResponse1)
  171. config := NewConfig()
  172. config.Producer.Flush.Messages = 5
  173. config.Producer.Return.Successes = true
  174. config.Producer.Partitioner = NewRoundRobinPartitioner
  175. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  176. if err != nil {
  177. t.Fatal(err)
  178. }
  179. for i := 0; i < 10; i++ {
  180. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  181. }
  182. expectResults(t, producer, 10, 0)
  183. closeProducer(t, producer)
  184. leader1.Close()
  185. leader0.Close()
  186. seedBroker.Close()
  187. }
  188. func TestAsyncProducerCustomPartitioner(t *testing.T) {
  189. seedBroker := NewMockBroker(t, 1)
  190. leader := NewMockBroker(t, 2)
  191. metadataResponse := new(MetadataResponse)
  192. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  193. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  194. seedBroker.Returns(metadataResponse)
  195. prodResponse := new(ProduceResponse)
  196. prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  197. leader.Returns(prodResponse)
  198. config := NewConfig()
  199. config.Producer.Flush.Messages = 2
  200. config.Producer.Return.Successes = true
  201. config.Producer.Partitioner = func(topic string) Partitioner {
  202. p := make(testPartitioner)
  203. go func() {
  204. p.feed(0)
  205. p <- nil
  206. p <- nil
  207. p <- nil
  208. p.feed(0)
  209. }()
  210. return p
  211. }
  212. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  213. if err != nil {
  214. t.Fatal(err)
  215. }
  216. for i := 0; i < 5; i++ {
  217. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  218. }
  219. expectResults(t, producer, 2, 3)
  220. closeProducer(t, producer)
  221. leader.Close()
  222. seedBroker.Close()
  223. }
  224. func TestAsyncProducerFailureRetry(t *testing.T) {
  225. seedBroker := NewMockBroker(t, 1)
  226. leader1 := NewMockBroker(t, 2)
  227. leader2 := NewMockBroker(t, 3)
  228. metadataLeader1 := new(MetadataResponse)
  229. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  230. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  231. seedBroker.Returns(metadataLeader1)
  232. config := NewConfig()
  233. config.Producer.Flush.Messages = 10
  234. config.Producer.Return.Successes = true
  235. config.Producer.Retry.Backoff = 0
  236. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  237. if err != nil {
  238. t.Fatal(err)
  239. }
  240. seedBroker.Close()
  241. for i := 0; i < 10; i++ {
  242. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  243. }
  244. prodNotLeader := new(ProduceResponse)
  245. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  246. leader1.Returns(prodNotLeader)
  247. metadataLeader2 := new(MetadataResponse)
  248. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  249. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  250. leader1.Returns(metadataLeader2)
  251. prodSuccess := new(ProduceResponse)
  252. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  253. leader2.Returns(prodSuccess)
  254. expectResults(t, producer, 10, 0)
  255. leader1.Close()
  256. for i := 0; i < 10; i++ {
  257. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  258. }
  259. leader2.Returns(prodSuccess)
  260. expectResults(t, producer, 10, 0)
  261. leader2.Close()
  262. closeProducer(t, producer)
  263. }
  264. func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
  265. tt := func(t *testing.T, kErr KError) {
  266. seedBroker := NewMockBroker(t, 1)
  267. leader1 := NewMockBroker(t, 2)
  268. leader2 := NewMockBroker(t, 3)
  269. metadataLeader1 := new(MetadataResponse)
  270. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  271. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  272. metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  273. seedBroker.Returns(metadataLeader1)
  274. config := NewConfig()
  275. config.Producer.Flush.Messages = 2
  276. config.Producer.Return.Successes = true
  277. config.Producer.Retry.Max = 0 // disable!
  278. config.Producer.Retry.Backoff = 0
  279. config.Producer.Partitioner = NewManualPartitioner
  280. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. seedBroker.Close()
  285. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  286. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  287. prodNotLeader := new(ProduceResponse)
  288. prodNotLeader.AddTopicPartition("my_topic", 0, kErr)
  289. prodNotLeader.AddTopicPartition("my_topic", 1, kErr)
  290. leader1.Returns(prodNotLeader)
  291. expectResults(t, producer, 0, 2)
  292. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  293. metadataLeader2 := new(MetadataResponse)
  294. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  295. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  296. metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, ErrNoError)
  297. leader1.Returns(metadataLeader2)
  298. leader1.Returns(metadataLeader2)
  299. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  300. prodSuccess := new(ProduceResponse)
  301. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  302. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  303. leader2.Returns(prodSuccess)
  304. expectResults(t, producer, 2, 0)
  305. leader1.Close()
  306. leader2.Close()
  307. closeProducer(t, producer)
  308. }
  309. t.Run("retriable error", func(t *testing.T) {
  310. tt(t, ErrNotLeaderForPartition)
  311. })
  312. t.Run("non-retriable error", func(t *testing.T) {
  313. tt(t, ErrNotController)
  314. })
  315. }
  316. func TestAsyncProducerEncoderFailures(t *testing.T) {
  317. seedBroker := NewMockBroker(t, 1)
  318. leader := NewMockBroker(t, 2)
  319. metadataResponse := new(MetadataResponse)
  320. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  321. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  322. seedBroker.Returns(metadataResponse)
  323. prodSuccess := new(ProduceResponse)
  324. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  325. leader.Returns(prodSuccess)
  326. leader.Returns(prodSuccess)
  327. leader.Returns(prodSuccess)
  328. config := NewConfig()
  329. config.Producer.Flush.Messages = 1
  330. config.Producer.Return.Successes = true
  331. config.Producer.Partitioner = NewManualPartitioner
  332. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  333. if err != nil {
  334. t.Fatal(err)
  335. }
  336. for flush := 0; flush < 3; flush++ {
  337. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
  338. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
  339. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
  340. expectResults(t, producer, 1, 2)
  341. }
  342. closeProducer(t, producer)
  343. leader.Close()
  344. seedBroker.Close()
  345. }
  346. // If a Kafka broker becomes unavailable and then returns back in service, then
  347. // producer reconnects to it and continues sending messages.
  348. func TestAsyncProducerBrokerBounce(t *testing.T) {
  349. // Given
  350. seedBroker := NewMockBroker(t, 1)
  351. leader := NewMockBroker(t, 2)
  352. leaderAddr := leader.Addr()
  353. metadataResponse := new(MetadataResponse)
  354. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  355. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  356. seedBroker.Returns(metadataResponse)
  357. prodSuccess := new(ProduceResponse)
  358. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  359. config := NewConfig()
  360. config.Producer.Flush.Messages = 1
  361. config.Producer.Return.Successes = true
  362. config.Producer.Retry.Backoff = 0
  363. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  364. if err != nil {
  365. t.Fatal(err)
  366. }
  367. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  368. leader.Returns(prodSuccess)
  369. expectResults(t, producer, 1, 0)
  370. // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
  371. leader.Close() // producer should get EOF
  372. leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  373. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  374. // Then: a produced message goes through the new broker connection.
  375. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  376. leader.Returns(prodSuccess)
  377. expectResults(t, producer, 1, 0)
  378. closeProducer(t, producer)
  379. seedBroker.Close()
  380. leader.Close()
  381. }
  382. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  383. seedBroker := NewMockBroker(t, 1)
  384. leader1 := NewMockBroker(t, 2)
  385. leader2 := NewMockBroker(t, 3)
  386. metadataLeader1 := new(MetadataResponse)
  387. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  388. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  389. seedBroker.Returns(metadataLeader1)
  390. config := NewConfig()
  391. config.Producer.Flush.Messages = 10
  392. config.Producer.Return.Successes = true
  393. config.Producer.Retry.Max = 3
  394. config.Producer.Retry.Backoff = 0
  395. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  396. if err != nil {
  397. t.Fatal(err)
  398. }
  399. for i := 0; i < 10; i++ {
  400. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  401. }
  402. leader1.Close() // producer should get EOF
  403. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  404. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  405. // ok fine, tell it to go to leader2 finally
  406. metadataLeader2 := new(MetadataResponse)
  407. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  408. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  409. seedBroker.Returns(metadataLeader2)
  410. prodSuccess := new(ProduceResponse)
  411. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  412. leader2.Returns(prodSuccess)
  413. expectResults(t, producer, 10, 0)
  414. seedBroker.Close()
  415. leader2.Close()
  416. closeProducer(t, producer)
  417. }
  418. func TestAsyncProducerMultipleRetries(t *testing.T) {
  419. seedBroker := NewMockBroker(t, 1)
  420. leader1 := NewMockBroker(t, 2)
  421. leader2 := NewMockBroker(t, 3)
  422. metadataLeader1 := new(MetadataResponse)
  423. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  424. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  425. seedBroker.Returns(metadataLeader1)
  426. config := NewConfig()
  427. config.Producer.Flush.Messages = 10
  428. config.Producer.Return.Successes = true
  429. config.Producer.Retry.Max = 4
  430. config.Producer.Retry.Backoff = 0
  431. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  432. if err != nil {
  433. t.Fatal(err)
  434. }
  435. for i := 0; i < 10; i++ {
  436. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  437. }
  438. prodNotLeader := new(ProduceResponse)
  439. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  440. leader1.Returns(prodNotLeader)
  441. metadataLeader2 := new(MetadataResponse)
  442. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  443. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  444. seedBroker.Returns(metadataLeader2)
  445. leader2.Returns(prodNotLeader)
  446. seedBroker.Returns(metadataLeader1)
  447. leader1.Returns(prodNotLeader)
  448. seedBroker.Returns(metadataLeader1)
  449. leader1.Returns(prodNotLeader)
  450. seedBroker.Returns(metadataLeader2)
  451. prodSuccess := new(ProduceResponse)
  452. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  453. leader2.Returns(prodSuccess)
  454. expectResults(t, producer, 10, 0)
  455. for i := 0; i < 10; i++ {
  456. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  457. }
  458. leader2.Returns(prodSuccess)
  459. expectResults(t, producer, 10, 0)
  460. seedBroker.Close()
  461. leader1.Close()
  462. leader2.Close()
  463. closeProducer(t, producer)
  464. }
  465. func TestAsyncProducerOutOfRetries(t *testing.T) {
  466. t.Skip("Enable once bug #294 is fixed.")
  467. seedBroker := NewMockBroker(t, 1)
  468. leader := NewMockBroker(t, 2)
  469. metadataResponse := new(MetadataResponse)
  470. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  471. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  472. seedBroker.Returns(metadataResponse)
  473. config := NewConfig()
  474. config.Producer.Flush.Messages = 10
  475. config.Producer.Return.Successes = true
  476. config.Producer.Retry.Backoff = 0
  477. config.Producer.Retry.Max = 0
  478. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  479. if err != nil {
  480. t.Fatal(err)
  481. }
  482. for i := 0; i < 10; i++ {
  483. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  484. }
  485. prodNotLeader := new(ProduceResponse)
  486. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  487. leader.Returns(prodNotLeader)
  488. for i := 0; i < 10; i++ {
  489. select {
  490. case msg := <-producer.Errors():
  491. if msg.Err != ErrNotLeaderForPartition {
  492. t.Error(msg.Err)
  493. }
  494. case <-producer.Successes():
  495. t.Error("Unexpected success")
  496. }
  497. }
  498. seedBroker.Returns(metadataResponse)
  499. for i := 0; i < 10; i++ {
  500. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  501. }
  502. prodSuccess := new(ProduceResponse)
  503. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  504. leader.Returns(prodSuccess)
  505. expectResults(t, producer, 10, 0)
  506. leader.Close()
  507. seedBroker.Close()
  508. safeClose(t, producer)
  509. }
  510. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  511. seedBroker := NewMockBroker(t, 1)
  512. leader := NewMockBroker(t, 2)
  513. leaderAddr := leader.Addr()
  514. metadataResponse := new(MetadataResponse)
  515. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  516. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  517. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  518. seedBroker.Returns(metadataResponse)
  519. config := NewConfig()
  520. config.Producer.Return.Successes = true
  521. config.Producer.Retry.Backoff = 0
  522. config.Producer.Retry.Max = 1
  523. config.Producer.Partitioner = NewRoundRobinPartitioner
  524. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  525. if err != nil {
  526. t.Fatal(err)
  527. }
  528. // prime partition 0
  529. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  530. prodSuccess := new(ProduceResponse)
  531. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  532. leader.Returns(prodSuccess)
  533. expectResults(t, producer, 1, 0)
  534. // prime partition 1
  535. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  536. prodSuccess = new(ProduceResponse)
  537. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  538. leader.Returns(prodSuccess)
  539. expectResults(t, producer, 1, 0)
  540. // reboot the broker (the producer will get EOF on its existing connection)
  541. leader.Close()
  542. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  543. // send another message on partition 0 to trigger the EOF and retry
  544. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  545. // tell partition 0 to go to that broker again
  546. seedBroker.Returns(metadataResponse)
  547. // succeed this time
  548. prodSuccess = new(ProduceResponse)
  549. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  550. leader.Returns(prodSuccess)
  551. expectResults(t, producer, 1, 0)
  552. // shutdown
  553. closeProducer(t, producer)
  554. seedBroker.Close()
  555. leader.Close()
  556. }
  557. func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
  558. seedBroker := NewMockBroker(t, 1)
  559. leader := NewMockBroker(t, 2)
  560. metadataResponse := new(MetadataResponse)
  561. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  562. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  563. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  564. seedBroker.Returns(metadataResponse)
  565. config := NewConfig()
  566. config.Producer.Flush.Messages = 5
  567. config.Producer.Return.Successes = true
  568. config.Producer.Retry.Backoff = 0
  569. config.Producer.Retry.Max = 1
  570. config.Producer.Partitioner = NewManualPartitioner
  571. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  572. if err != nil {
  573. t.Fatal(err)
  574. }
  575. // prime partitions
  576. for p := int32(0); p < 2; p++ {
  577. for i := 0; i < 5; i++ {
  578. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
  579. }
  580. prodSuccess := new(ProduceResponse)
  581. prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
  582. leader.Returns(prodSuccess)
  583. expectResults(t, producer, 5, 0)
  584. }
  585. // send more messages on partition 0
  586. for i := 0; i < 5; i++ {
  587. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  588. }
  589. prodNotLeader := new(ProduceResponse)
  590. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  591. leader.Returns(prodNotLeader)
  592. time.Sleep(50 * time.Millisecond)
  593. leader.SetHandlerByMap(map[string]MockResponse{
  594. "ProduceRequest": NewMockProduceResponse(t).
  595. SetVersion(0).
  596. SetError("my_topic", 0, ErrNoError),
  597. })
  598. // tell partition 0 to go to that broker again
  599. seedBroker.Returns(metadataResponse)
  600. // succeed this time
  601. expectResults(t, producer, 5, 0)
  602. // put five more through
  603. for i := 0; i < 5; i++ {
  604. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  605. }
  606. expectResults(t, producer, 5, 0)
  607. // shutdown
  608. closeProducer(t, producer)
  609. seedBroker.Close()
  610. leader.Close()
  611. }
  612. func TestAsyncProducerRetryShutdown(t *testing.T) {
  613. seedBroker := NewMockBroker(t, 1)
  614. leader := NewMockBroker(t, 2)
  615. metadataLeader := new(MetadataResponse)
  616. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  617. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  618. seedBroker.Returns(metadataLeader)
  619. config := NewConfig()
  620. config.Producer.Flush.Messages = 10
  621. config.Producer.Return.Successes = true
  622. config.Producer.Retry.Backoff = 0
  623. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  624. if err != nil {
  625. t.Fatal(err)
  626. }
  627. for i := 0; i < 10; i++ {
  628. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  629. }
  630. producer.AsyncClose()
  631. time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
  632. producer.Input() <- &ProducerMessage{Topic: "FOO"}
  633. if err := <-producer.Errors(); err.Err != ErrShuttingDown {
  634. t.Error(err)
  635. }
  636. prodNotLeader := new(ProduceResponse)
  637. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  638. leader.Returns(prodNotLeader)
  639. seedBroker.Returns(metadataLeader)
  640. prodSuccess := new(ProduceResponse)
  641. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  642. leader.Returns(prodSuccess)
  643. expectResults(t, producer, 10, 0)
  644. seedBroker.Close()
  645. leader.Close()
  646. // wait for the async-closed producer to shut down fully
  647. for err := range producer.Errors() {
  648. t.Error(err)
  649. }
  650. }
  651. func TestAsyncProducerNoReturns(t *testing.T) {
  652. seedBroker := NewMockBroker(t, 1)
  653. leader := NewMockBroker(t, 2)
  654. metadataLeader := new(MetadataResponse)
  655. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  656. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  657. seedBroker.Returns(metadataLeader)
  658. config := NewConfig()
  659. config.Producer.Flush.Messages = 10
  660. config.Producer.Return.Successes = false
  661. config.Producer.Return.Errors = false
  662. config.Producer.Retry.Backoff = 0
  663. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  664. if err != nil {
  665. t.Fatal(err)
  666. }
  667. for i := 0; i < 10; i++ {
  668. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  669. }
  670. wait := make(chan bool)
  671. go func() {
  672. if err := producer.Close(); err != nil {
  673. t.Error(err)
  674. }
  675. close(wait)
  676. }()
  677. prodSuccess := new(ProduceResponse)
  678. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  679. leader.Returns(prodSuccess)
  680. <-wait
  681. seedBroker.Close()
  682. leader.Close()
  683. }
  684. func TestAsyncProducerIdempotentGoldenPath(t *testing.T) {
  685. broker := NewMockBroker(t, 1)
  686. metadataResponse := &MetadataResponse{
  687. Version: 1,
  688. ControllerID: 1,
  689. }
  690. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  691. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  692. broker.Returns(metadataResponse)
  693. initProducerID := &InitProducerIDResponse{
  694. ThrottleTime: 0,
  695. ProducerID: 1000,
  696. ProducerEpoch: 1,
  697. }
  698. broker.Returns(initProducerID)
  699. config := NewConfig()
  700. config.Producer.Flush.Messages = 10
  701. config.Producer.Return.Successes = true
  702. config.Producer.Retry.Max = 4
  703. config.Producer.RequiredAcks = WaitForAll
  704. config.Producer.Retry.Backoff = 0
  705. config.Producer.Idempotent = true
  706. config.Net.MaxOpenRequests = 1
  707. config.Version = V0_11_0_0
  708. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  709. if err != nil {
  710. t.Fatal(err)
  711. }
  712. for i := 0; i < 10; i++ {
  713. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  714. }
  715. prodSuccess := &ProduceResponse{
  716. Version: 3,
  717. ThrottleTime: 0,
  718. }
  719. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  720. broker.Returns(prodSuccess)
  721. expectResults(t, producer, 10, 0)
  722. broker.Close()
  723. closeProducer(t, producer)
  724. }
  725. func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
  726. //Logger = log.New(os.Stderr, "", log.LstdFlags)
  727. tests := []struct {
  728. name string
  729. failAfterWrite bool
  730. }{
  731. {"FailAfterWrite", true},
  732. {"FailBeforeWrite", false},
  733. }
  734. for _, test := range tests {
  735. broker := NewMockBroker(t, 1)
  736. metadataResponse := &MetadataResponse{
  737. Version: 1,
  738. ControllerID: 1,
  739. }
  740. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  741. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  742. initProducerIDResponse := &InitProducerIDResponse{
  743. ThrottleTime: 0,
  744. ProducerID: 1000,
  745. ProducerEpoch: 1,
  746. }
  747. prodNotLeaderResponse := &ProduceResponse{
  748. Version: 3,
  749. ThrottleTime: 0,
  750. }
  751. prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
  752. prodDuplicate := &ProduceResponse{
  753. Version: 3,
  754. ThrottleTime: 0,
  755. }
  756. prodDuplicate.AddTopicPartition("my_topic", 0, ErrDuplicateSequenceNumber)
  757. prodOutOfSeq := &ProduceResponse{
  758. Version: 3,
  759. ThrottleTime: 0,
  760. }
  761. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  762. prodSuccessResponse := &ProduceResponse{
  763. Version: 3,
  764. ThrottleTime: 0,
  765. }
  766. prodSuccessResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  767. prodCounter := 0
  768. lastBatchFirstSeq := -1
  769. lastBatchSize := -1
  770. lastSequenceWrittenToDisk := -1
  771. handlerFailBeforeWrite := func(req *request) (res encoder) {
  772. switch req.body.key() {
  773. case 3:
  774. return metadataResponse
  775. case 22:
  776. return initProducerIDResponse
  777. case 0:
  778. prodCounter++
  779. preq := req.body.(*ProduceRequest)
  780. batch := preq.records["my_topic"][0].RecordBatch
  781. batchFirstSeq := int(batch.FirstSequence)
  782. batchSize := len(batch.Records)
  783. if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append
  784. if lastBatchFirstSeq == batchFirstSeq { //is a batch retry
  785. if lastBatchSize == batchSize { //good retry
  786. // mock write to disk
  787. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  788. return prodSuccessResponse
  789. }
  790. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  791. return prodOutOfSeq
  792. } else { // not a retry
  793. // save batch just received for future check
  794. lastBatchFirstSeq = batchFirstSeq
  795. lastBatchSize = batchSize
  796. if prodCounter%2 == 1 {
  797. if test.failAfterWrite {
  798. // mock write to disk
  799. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  800. }
  801. return prodNotLeaderResponse
  802. }
  803. // mock write to disk
  804. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  805. return prodSuccessResponse
  806. }
  807. } else {
  808. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize == batchSize { // is a good batch retry
  809. if lastSequenceWrittenToDisk == (batchFirstSeq + batchSize - 1) { // we already have the messages
  810. return prodDuplicate
  811. }
  812. // mock write to disk
  813. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  814. return prodSuccessResponse
  815. } else { //out of sequence / bad retried batch
  816. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize {
  817. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  818. } else if lastSequenceWrittenToDisk+1 != batchFirstSeq {
  819. t.Errorf("[%s] Out of sequence message lastSequence=%d new batch starts at=%d", test.name, lastSequenceWrittenToDisk, batchFirstSeq)
  820. } else {
  821. t.Errorf("[%s] Unexpected error", test.name)
  822. }
  823. return prodOutOfSeq
  824. }
  825. }
  826. }
  827. return nil
  828. }
  829. config := NewConfig()
  830. config.Version = V0_11_0_0
  831. config.Producer.Idempotent = true
  832. config.Net.MaxOpenRequests = 1
  833. config.Producer.RequiredAcks = WaitForAll
  834. config.Producer.Return.Successes = true
  835. config.Producer.Flush.Frequency = 50 * time.Millisecond
  836. config.Producer.Retry.Backoff = 100 * time.Millisecond
  837. broker.setHandler(handlerFailBeforeWrite)
  838. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  839. if err != nil {
  840. t.Fatal(err)
  841. }
  842. for i := 0; i < 3; i++ {
  843. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  844. }
  845. go func() {
  846. for i := 0; i < 7; i++ {
  847. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")}
  848. time.Sleep(100 * time.Millisecond)
  849. }
  850. }()
  851. expectResults(t, producer, 10, 0)
  852. broker.Close()
  853. closeProducer(t, producer)
  854. }
  855. }
  856. func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
  857. broker := NewMockBroker(t, 1)
  858. metadataResponse := &MetadataResponse{
  859. Version: 1,
  860. ControllerID: 1,
  861. }
  862. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  863. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  864. broker.Returns(metadataResponse)
  865. initProducerID := &InitProducerIDResponse{
  866. ThrottleTime: 0,
  867. ProducerID: 1000,
  868. ProducerEpoch: 1,
  869. }
  870. broker.Returns(initProducerID)
  871. config := NewConfig()
  872. config.Producer.Flush.Messages = 10
  873. config.Producer.Return.Successes = true
  874. config.Producer.Retry.Max = 400000
  875. config.Producer.RequiredAcks = WaitForAll
  876. config.Producer.Retry.Backoff = 0
  877. config.Producer.Idempotent = true
  878. config.Net.MaxOpenRequests = 1
  879. config.Version = V0_11_0_0
  880. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  881. if err != nil {
  882. t.Fatal(err)
  883. }
  884. for i := 0; i < 10; i++ {
  885. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  886. }
  887. prodOutOfSeq := &ProduceResponse{
  888. Version: 3,
  889. ThrottleTime: 0,
  890. }
  891. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  892. broker.Returns(prodOutOfSeq)
  893. expectResults(t, producer, 0, 10)
  894. broker.Close()
  895. closeProducer(t, producer)
  896. }
  897. // This example shows how to use the producer while simultaneously
  898. // reading the Errors channel to know about any failures.
  899. func ExampleAsyncProducer_select() {
  900. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  901. if err != nil {
  902. panic(err)
  903. }
  904. defer func() {
  905. if err := producer.Close(); err != nil {
  906. log.Fatalln(err)
  907. }
  908. }()
  909. // Trap SIGINT to trigger a shutdown.
  910. signals := make(chan os.Signal, 1)
  911. signal.Notify(signals, os.Interrupt)
  912. var enqueued, errors int
  913. ProducerLoop:
  914. for {
  915. select {
  916. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  917. enqueued++
  918. case err := <-producer.Errors():
  919. log.Println("Failed to produce message", err)
  920. errors++
  921. case <-signals:
  922. break ProducerLoop
  923. }
  924. }
  925. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  926. }
  927. // This example shows how to use the producer with separate goroutines
  928. // reading from the Successes and Errors channels. Note that in order
  929. // for the Successes channel to be populated, you have to set
  930. // config.Producer.Return.Successes to true.
  931. func ExampleAsyncProducer_goroutines() {
  932. config := NewConfig()
  933. config.Producer.Return.Successes = true
  934. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  935. if err != nil {
  936. panic(err)
  937. }
  938. // Trap SIGINT to trigger a graceful shutdown.
  939. signals := make(chan os.Signal, 1)
  940. signal.Notify(signals, os.Interrupt)
  941. var (
  942. wg sync.WaitGroup
  943. enqueued, successes, errors int
  944. )
  945. wg.Add(1)
  946. go func() {
  947. defer wg.Done()
  948. for range producer.Successes() {
  949. successes++
  950. }
  951. }()
  952. wg.Add(1)
  953. go func() {
  954. defer wg.Done()
  955. for err := range producer.Errors() {
  956. log.Println(err)
  957. errors++
  958. }
  959. }()
  960. ProducerLoop:
  961. for {
  962. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  963. select {
  964. case producer.Input() <- message:
  965. enqueued++
  966. case <-signals:
  967. producer.AsyncClose() // Trigger a shutdown of the producer.
  968. break ProducerLoop
  969. }
  970. }
  971. wg.Wait()
  972. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  973. }