async_producer_test.go 31 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. const TestMessage = "ABC THE MESSAGE"
  12. func closeProducer(t *testing.T, p AsyncProducer) {
  13. var wg sync.WaitGroup
  14. p.AsyncClose()
  15. wg.Add(2)
  16. go func() {
  17. for range p.Successes() {
  18. t.Error("Unexpected message on Successes()")
  19. }
  20. wg.Done()
  21. }()
  22. go func() {
  23. for msg := range p.Errors() {
  24. t.Error(msg.Err)
  25. }
  26. wg.Done()
  27. }()
  28. wg.Wait()
  29. }
  30. func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
  31. expect := successes + errors
  32. for expect > 0 {
  33. select {
  34. case msg := <-p.Errors():
  35. if msg.Msg.flags != 0 {
  36. t.Error("Message had flags set")
  37. }
  38. errors--
  39. expect--
  40. if errors < 0 {
  41. t.Error(msg.Err)
  42. }
  43. case msg := <-p.Successes():
  44. if msg.flags != 0 {
  45. t.Error("Message had flags set")
  46. }
  47. successes--
  48. expect--
  49. if successes < 0 {
  50. t.Error("Too many successes")
  51. }
  52. }
  53. }
  54. if successes != 0 || errors != 0 {
  55. t.Error("Unexpected successes", successes, "or errors", errors)
  56. }
  57. }
  58. type testPartitioner chan *int32
  59. func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
  60. part := <-p
  61. if part == nil {
  62. return 0, errors.New("BOOM")
  63. }
  64. return *part, nil
  65. }
  66. func (p testPartitioner) RequiresConsistency() bool {
  67. return true
  68. }
  69. func (p testPartitioner) feed(partition int32) {
  70. p <- &partition
  71. }
  72. type flakyEncoder bool
  73. func (f flakyEncoder) Length() int {
  74. return len(TestMessage)
  75. }
  76. func (f flakyEncoder) Encode() ([]byte, error) {
  77. if !bool(f) {
  78. return nil, errors.New("flaky encoding error")
  79. }
  80. return []byte(TestMessage), nil
  81. }
  82. func TestAsyncProducer(t *testing.T) {
  83. seedBroker := NewMockBroker(t, 1)
  84. leader := NewMockBroker(t, 2)
  85. metadataResponse := new(MetadataResponse)
  86. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  87. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  88. seedBroker.Returns(metadataResponse)
  89. prodSuccess := new(ProduceResponse)
  90. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  91. leader.Returns(prodSuccess)
  92. config := NewConfig()
  93. config.Producer.Flush.Messages = 10
  94. config.Producer.Return.Successes = true
  95. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  96. if err != nil {
  97. t.Fatal(err)
  98. }
  99. for i := 0; i < 10; i++ {
  100. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  101. }
  102. for i := 0; i < 10; i++ {
  103. select {
  104. case msg := <-producer.Errors():
  105. t.Error(msg.Err)
  106. if msg.Msg.flags != 0 {
  107. t.Error("Message had flags set")
  108. }
  109. case msg := <-producer.Successes():
  110. if msg.flags != 0 {
  111. t.Error("Message had flags set")
  112. }
  113. if msg.Metadata.(int) != i {
  114. t.Error("Message metadata did not match")
  115. }
  116. case <-time.After(time.Second):
  117. t.Errorf("Timeout waiting for msg #%d", i)
  118. goto done
  119. }
  120. }
  121. done:
  122. closeProducer(t, producer)
  123. leader.Close()
  124. seedBroker.Close()
  125. }
  126. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  127. seedBroker := NewMockBroker(t, 1)
  128. leader := NewMockBroker(t, 2)
  129. metadataResponse := new(MetadataResponse)
  130. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  131. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  132. seedBroker.Returns(metadataResponse)
  133. prodSuccess := new(ProduceResponse)
  134. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  135. leader.Returns(prodSuccess)
  136. leader.Returns(prodSuccess)
  137. leader.Returns(prodSuccess)
  138. config := NewConfig()
  139. config.Producer.Flush.Messages = 5
  140. config.Producer.Return.Successes = true
  141. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. for flush := 0; flush < 3; flush++ {
  146. for i := 0; i < 5; i++ {
  147. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  148. }
  149. expectResults(t, producer, 5, 0)
  150. }
  151. closeProducer(t, producer)
  152. leader.Close()
  153. seedBroker.Close()
  154. }
  155. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  156. seedBroker := NewMockBroker(t, 1)
  157. leader0 := NewMockBroker(t, 2)
  158. leader1 := NewMockBroker(t, 3)
  159. metadataResponse := new(MetadataResponse)
  160. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  161. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  162. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  163. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  164. seedBroker.Returns(metadataResponse)
  165. prodResponse0 := new(ProduceResponse)
  166. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  167. leader0.Returns(prodResponse0)
  168. prodResponse1 := new(ProduceResponse)
  169. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  170. leader1.Returns(prodResponse1)
  171. config := NewConfig()
  172. config.Producer.Flush.Messages = 5
  173. config.Producer.Return.Successes = true
  174. config.Producer.Partitioner = NewRoundRobinPartitioner
  175. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  176. if err != nil {
  177. t.Fatal(err)
  178. }
  179. for i := 0; i < 10; i++ {
  180. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  181. }
  182. expectResults(t, producer, 10, 0)
  183. closeProducer(t, producer)
  184. leader1.Close()
  185. leader0.Close()
  186. seedBroker.Close()
  187. }
  188. func TestAsyncProducerCustomPartitioner(t *testing.T) {
  189. seedBroker := NewMockBroker(t, 1)
  190. leader := NewMockBroker(t, 2)
  191. metadataResponse := new(MetadataResponse)
  192. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  193. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  194. seedBroker.Returns(metadataResponse)
  195. prodResponse := new(ProduceResponse)
  196. prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  197. leader.Returns(prodResponse)
  198. config := NewConfig()
  199. config.Producer.Flush.Messages = 2
  200. config.Producer.Return.Successes = true
  201. config.Producer.Partitioner = func(topic string) Partitioner {
  202. p := make(testPartitioner)
  203. go func() {
  204. p.feed(0)
  205. p <- nil
  206. p <- nil
  207. p <- nil
  208. p.feed(0)
  209. }()
  210. return p
  211. }
  212. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  213. if err != nil {
  214. t.Fatal(err)
  215. }
  216. for i := 0; i < 5; i++ {
  217. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  218. }
  219. expectResults(t, producer, 2, 3)
  220. closeProducer(t, producer)
  221. leader.Close()
  222. seedBroker.Close()
  223. }
  224. func TestAsyncProducerFailureRetry(t *testing.T) {
  225. seedBroker := NewMockBroker(t, 1)
  226. leader1 := NewMockBroker(t, 2)
  227. leader2 := NewMockBroker(t, 3)
  228. metadataLeader1 := new(MetadataResponse)
  229. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  230. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  231. seedBroker.Returns(metadataLeader1)
  232. config := NewConfig()
  233. config.Producer.Flush.Messages = 10
  234. config.Producer.Return.Successes = true
  235. config.Producer.Retry.Backoff = 0
  236. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  237. if err != nil {
  238. t.Fatal(err)
  239. }
  240. seedBroker.Close()
  241. for i := 0; i < 10; i++ {
  242. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  243. }
  244. prodNotLeader := new(ProduceResponse)
  245. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  246. leader1.Returns(prodNotLeader)
  247. metadataLeader2 := new(MetadataResponse)
  248. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  249. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  250. leader1.Returns(metadataLeader2)
  251. prodSuccess := new(ProduceResponse)
  252. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  253. leader2.Returns(prodSuccess)
  254. expectResults(t, producer, 10, 0)
  255. leader1.Close()
  256. for i := 0; i < 10; i++ {
  257. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  258. }
  259. leader2.Returns(prodSuccess)
  260. expectResults(t, producer, 10, 0)
  261. leader2.Close()
  262. closeProducer(t, producer)
  263. }
  264. func TestAsyncProducerEncoderFailures(t *testing.T) {
  265. seedBroker := NewMockBroker(t, 1)
  266. leader := NewMockBroker(t, 2)
  267. metadataResponse := new(MetadataResponse)
  268. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  269. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  270. seedBroker.Returns(metadataResponse)
  271. prodSuccess := new(ProduceResponse)
  272. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  273. leader.Returns(prodSuccess)
  274. leader.Returns(prodSuccess)
  275. leader.Returns(prodSuccess)
  276. config := NewConfig()
  277. config.Producer.Flush.Messages = 1
  278. config.Producer.Return.Successes = true
  279. config.Producer.Partitioner = NewManualPartitioner
  280. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. for flush := 0; flush < 3; flush++ {
  285. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
  286. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
  287. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
  288. expectResults(t, producer, 1, 2)
  289. }
  290. closeProducer(t, producer)
  291. leader.Close()
  292. seedBroker.Close()
  293. }
  294. // If a Kafka broker becomes unavailable and then returns back in service, then
  295. // producer reconnects to it and continues sending messages.
  296. func TestAsyncProducerBrokerBounce(t *testing.T) {
  297. // Given
  298. seedBroker := NewMockBroker(t, 1)
  299. leader := NewMockBroker(t, 2)
  300. leaderAddr := leader.Addr()
  301. metadataResponse := new(MetadataResponse)
  302. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  303. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  304. seedBroker.Returns(metadataResponse)
  305. prodSuccess := new(ProduceResponse)
  306. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  307. config := NewConfig()
  308. config.Producer.Flush.Messages = 1
  309. config.Producer.Return.Successes = true
  310. config.Producer.Retry.Backoff = 0
  311. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  312. if err != nil {
  313. t.Fatal(err)
  314. }
  315. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  316. leader.Returns(prodSuccess)
  317. expectResults(t, producer, 1, 0)
  318. // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
  319. leader.Close() // producer should get EOF
  320. leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  321. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  322. // Then: a produced message goes through the new broker connection.
  323. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  324. leader.Returns(prodSuccess)
  325. expectResults(t, producer, 1, 0)
  326. closeProducer(t, producer)
  327. seedBroker.Close()
  328. leader.Close()
  329. }
  330. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  331. seedBroker := NewMockBroker(t, 1)
  332. leader1 := NewMockBroker(t, 2)
  333. leader2 := NewMockBroker(t, 3)
  334. metadataLeader1 := new(MetadataResponse)
  335. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  336. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  337. seedBroker.Returns(metadataLeader1)
  338. config := NewConfig()
  339. config.Producer.Flush.Messages = 10
  340. config.Producer.Return.Successes = true
  341. config.Producer.Retry.Max = 3
  342. config.Producer.Retry.Backoff = 0
  343. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  344. if err != nil {
  345. t.Fatal(err)
  346. }
  347. for i := 0; i < 10; i++ {
  348. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  349. }
  350. leader1.Close() // producer should get EOF
  351. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  352. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  353. // ok fine, tell it to go to leader2 finally
  354. metadataLeader2 := new(MetadataResponse)
  355. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  356. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  357. seedBroker.Returns(metadataLeader2)
  358. prodSuccess := new(ProduceResponse)
  359. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  360. leader2.Returns(prodSuccess)
  361. expectResults(t, producer, 10, 0)
  362. seedBroker.Close()
  363. leader2.Close()
  364. closeProducer(t, producer)
  365. }
  366. func TestAsyncProducerMultipleRetries(t *testing.T) {
  367. seedBroker := NewMockBroker(t, 1)
  368. leader1 := NewMockBroker(t, 2)
  369. leader2 := NewMockBroker(t, 3)
  370. metadataLeader1 := new(MetadataResponse)
  371. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  372. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  373. seedBroker.Returns(metadataLeader1)
  374. config := NewConfig()
  375. config.Producer.Flush.Messages = 10
  376. config.Producer.Return.Successes = true
  377. config.Producer.Retry.Max = 4
  378. config.Producer.Retry.Backoff = 0
  379. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  380. if err != nil {
  381. t.Fatal(err)
  382. }
  383. for i := 0; i < 10; i++ {
  384. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  385. }
  386. prodNotLeader := new(ProduceResponse)
  387. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  388. leader1.Returns(prodNotLeader)
  389. metadataLeader2 := new(MetadataResponse)
  390. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  391. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  392. seedBroker.Returns(metadataLeader2)
  393. leader2.Returns(prodNotLeader)
  394. seedBroker.Returns(metadataLeader1)
  395. leader1.Returns(prodNotLeader)
  396. seedBroker.Returns(metadataLeader1)
  397. leader1.Returns(prodNotLeader)
  398. seedBroker.Returns(metadataLeader2)
  399. prodSuccess := new(ProduceResponse)
  400. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  401. leader2.Returns(prodSuccess)
  402. expectResults(t, producer, 10, 0)
  403. for i := 0; i < 10; i++ {
  404. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  405. }
  406. leader2.Returns(prodSuccess)
  407. expectResults(t, producer, 10, 0)
  408. seedBroker.Close()
  409. leader1.Close()
  410. leader2.Close()
  411. closeProducer(t, producer)
  412. }
  413. func TestAsyncProducerOutOfRetries(t *testing.T) {
  414. t.Skip("Enable once bug #294 is fixed.")
  415. seedBroker := NewMockBroker(t, 1)
  416. leader := NewMockBroker(t, 2)
  417. metadataResponse := new(MetadataResponse)
  418. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  419. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  420. seedBroker.Returns(metadataResponse)
  421. config := NewConfig()
  422. config.Producer.Flush.Messages = 10
  423. config.Producer.Return.Successes = true
  424. config.Producer.Retry.Backoff = 0
  425. config.Producer.Retry.Max = 0
  426. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  427. if err != nil {
  428. t.Fatal(err)
  429. }
  430. for i := 0; i < 10; i++ {
  431. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  432. }
  433. prodNotLeader := new(ProduceResponse)
  434. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  435. leader.Returns(prodNotLeader)
  436. for i := 0; i < 10; i++ {
  437. select {
  438. case msg := <-producer.Errors():
  439. if msg.Err != ErrNotLeaderForPartition {
  440. t.Error(msg.Err)
  441. }
  442. case <-producer.Successes():
  443. t.Error("Unexpected success")
  444. }
  445. }
  446. seedBroker.Returns(metadataResponse)
  447. for i := 0; i < 10; i++ {
  448. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  449. }
  450. prodSuccess := new(ProduceResponse)
  451. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  452. leader.Returns(prodSuccess)
  453. expectResults(t, producer, 10, 0)
  454. leader.Close()
  455. seedBroker.Close()
  456. safeClose(t, producer)
  457. }
  458. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  459. seedBroker := NewMockBroker(t, 1)
  460. leader := NewMockBroker(t, 2)
  461. leaderAddr := leader.Addr()
  462. metadataResponse := new(MetadataResponse)
  463. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  464. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  465. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  466. seedBroker.Returns(metadataResponse)
  467. config := NewConfig()
  468. config.Producer.Return.Successes = true
  469. config.Producer.Retry.Backoff = 0
  470. config.Producer.Retry.Max = 1
  471. config.Producer.Partitioner = NewRoundRobinPartitioner
  472. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  473. if err != nil {
  474. t.Fatal(err)
  475. }
  476. // prime partition 0
  477. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  478. prodSuccess := new(ProduceResponse)
  479. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  480. leader.Returns(prodSuccess)
  481. expectResults(t, producer, 1, 0)
  482. // prime partition 1
  483. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  484. prodSuccess = new(ProduceResponse)
  485. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  486. leader.Returns(prodSuccess)
  487. expectResults(t, producer, 1, 0)
  488. // reboot the broker (the producer will get EOF on its existing connection)
  489. leader.Close()
  490. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  491. // send another message on partition 0 to trigger the EOF and retry
  492. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  493. // tell partition 0 to go to that broker again
  494. seedBroker.Returns(metadataResponse)
  495. // succeed this time
  496. prodSuccess = new(ProduceResponse)
  497. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  498. leader.Returns(prodSuccess)
  499. expectResults(t, producer, 1, 0)
  500. // shutdown
  501. closeProducer(t, producer)
  502. seedBroker.Close()
  503. leader.Close()
  504. }
  505. func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
  506. seedBroker := NewMockBroker(t, 1)
  507. leader := NewMockBroker(t, 2)
  508. metadataResponse := new(MetadataResponse)
  509. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  510. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  511. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  512. seedBroker.Returns(metadataResponse)
  513. config := NewConfig()
  514. config.Producer.Flush.Messages = 5
  515. config.Producer.Return.Successes = true
  516. config.Producer.Retry.Backoff = 0
  517. config.Producer.Retry.Max = 1
  518. config.Producer.Partitioner = NewManualPartitioner
  519. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  520. if err != nil {
  521. t.Fatal(err)
  522. }
  523. // prime partitions
  524. for p := int32(0); p < 2; p++ {
  525. for i := 0; i < 5; i++ {
  526. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
  527. }
  528. prodSuccess := new(ProduceResponse)
  529. prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
  530. leader.Returns(prodSuccess)
  531. expectResults(t, producer, 5, 0)
  532. }
  533. // send more messages on partition 0
  534. for i := 0; i < 5; i++ {
  535. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  536. }
  537. prodNotLeader := new(ProduceResponse)
  538. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  539. leader.Returns(prodNotLeader)
  540. time.Sleep(50 * time.Millisecond)
  541. leader.SetHandlerByMap(map[string]MockResponse{
  542. "ProduceRequest": NewMockProduceResponse(t).
  543. SetVersion(0).
  544. SetError("my_topic", 0, ErrNoError),
  545. })
  546. // tell partition 0 to go to that broker again
  547. seedBroker.Returns(metadataResponse)
  548. // succeed this time
  549. expectResults(t, producer, 5, 0)
  550. // put five more through
  551. for i := 0; i < 5; i++ {
  552. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  553. }
  554. expectResults(t, producer, 5, 0)
  555. // shutdown
  556. closeProducer(t, producer)
  557. seedBroker.Close()
  558. leader.Close()
  559. }
  560. func TestAsyncProducerRetryShutdown(t *testing.T) {
  561. seedBroker := NewMockBroker(t, 1)
  562. leader := NewMockBroker(t, 2)
  563. metadataLeader := new(MetadataResponse)
  564. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  565. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  566. seedBroker.Returns(metadataLeader)
  567. config := NewConfig()
  568. config.Producer.Flush.Messages = 10
  569. config.Producer.Return.Successes = true
  570. config.Producer.Retry.Backoff = 0
  571. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  572. if err != nil {
  573. t.Fatal(err)
  574. }
  575. for i := 0; i < 10; i++ {
  576. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  577. }
  578. producer.AsyncClose()
  579. time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
  580. producer.Input() <- &ProducerMessage{Topic: "FOO"}
  581. if err := <-producer.Errors(); err.Err != ErrShuttingDown {
  582. t.Error(err)
  583. }
  584. prodNotLeader := new(ProduceResponse)
  585. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  586. leader.Returns(prodNotLeader)
  587. seedBroker.Returns(metadataLeader)
  588. prodSuccess := new(ProduceResponse)
  589. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  590. leader.Returns(prodSuccess)
  591. expectResults(t, producer, 10, 0)
  592. seedBroker.Close()
  593. leader.Close()
  594. // wait for the async-closed producer to shut down fully
  595. for err := range producer.Errors() {
  596. t.Error(err)
  597. }
  598. }
  599. func TestAsyncProducerNoReturns(t *testing.T) {
  600. seedBroker := NewMockBroker(t, 1)
  601. leader := NewMockBroker(t, 2)
  602. metadataLeader := new(MetadataResponse)
  603. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  604. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  605. seedBroker.Returns(metadataLeader)
  606. config := NewConfig()
  607. config.Producer.Flush.Messages = 10
  608. config.Producer.Return.Successes = false
  609. config.Producer.Return.Errors = false
  610. config.Producer.Retry.Backoff = 0
  611. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  612. if err != nil {
  613. t.Fatal(err)
  614. }
  615. for i := 0; i < 10; i++ {
  616. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  617. }
  618. wait := make(chan bool)
  619. go func() {
  620. if err := producer.Close(); err != nil {
  621. t.Error(err)
  622. }
  623. close(wait)
  624. }()
  625. prodSuccess := new(ProduceResponse)
  626. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  627. leader.Returns(prodSuccess)
  628. <-wait
  629. seedBroker.Close()
  630. leader.Close()
  631. }
  632. func TestAsyncProducerIdempotentGoldenPath(t *testing.T) {
  633. broker := NewMockBroker(t, 1)
  634. metadataResponse := &MetadataResponse{
  635. Version: 1,
  636. ControllerID: 1,
  637. }
  638. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  639. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  640. broker.Returns(metadataResponse)
  641. initProducerID := &InitProducerIDResponse{
  642. ThrottleTime: 0,
  643. ProducerID: 1000,
  644. ProducerEpoch: 1,
  645. }
  646. broker.Returns(initProducerID)
  647. config := NewConfig()
  648. config.Producer.Flush.Messages = 10
  649. config.Producer.Return.Successes = true
  650. config.Producer.Retry.Max = 4
  651. config.Producer.RequiredAcks = WaitForAll
  652. config.Producer.Retry.Backoff = 0
  653. config.Producer.Idempotent = true
  654. config.Net.MaxOpenRequests = 1
  655. config.Version = V0_11_0_0
  656. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  657. if err != nil {
  658. t.Fatal(err)
  659. }
  660. for i := 0; i < 10; i++ {
  661. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  662. }
  663. prodSuccess := &ProduceResponse{
  664. Version: 3,
  665. ThrottleTime: 0,
  666. }
  667. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  668. broker.Returns(prodSuccess)
  669. expectResults(t, producer, 10, 0)
  670. broker.Close()
  671. closeProducer(t, producer)
  672. }
  673. func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
  674. //Logger = log.New(os.Stderr, "", log.LstdFlags)
  675. tests := []struct {
  676. name string
  677. failAfterWrite bool
  678. }{
  679. {"FailAfterWrite", true},
  680. {"FailBeforeWrite", false},
  681. }
  682. for _, test := range tests {
  683. broker := NewMockBroker(t, 1)
  684. metadataResponse := &MetadataResponse{
  685. Version: 1,
  686. ControllerID: 1,
  687. }
  688. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  689. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  690. initProducerIDResponse := &InitProducerIDResponse{
  691. ThrottleTime: 0,
  692. ProducerID: 1000,
  693. ProducerEpoch: 1,
  694. }
  695. prodNotLeaderResponse := &ProduceResponse{
  696. Version: 3,
  697. ThrottleTime: 0,
  698. }
  699. prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
  700. prodDuplicate := &ProduceResponse{
  701. Version: 3,
  702. ThrottleTime: 0,
  703. }
  704. prodDuplicate.AddTopicPartition("my_topic", 0, ErrDuplicateSequenceNumber)
  705. prodOutOfSeq := &ProduceResponse{
  706. Version: 3,
  707. ThrottleTime: 0,
  708. }
  709. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  710. prodSuccessResponse := &ProduceResponse{
  711. Version: 3,
  712. ThrottleTime: 0,
  713. }
  714. prodSuccessResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  715. prodCounter := 0
  716. lastBatchFirstSeq := -1
  717. lastBatchSize := -1
  718. lastSequenceWrittenToDisk := -1
  719. handlerFailBeforeWrite := func(req *request) (res encoder) {
  720. switch req.body.key() {
  721. case 3:
  722. return metadataResponse
  723. case 22:
  724. return initProducerIDResponse
  725. case 0:
  726. prodCounter++
  727. preq := req.body.(*ProduceRequest)
  728. batch := preq.records["my_topic"][0].RecordBatch
  729. batchFirstSeq := int(batch.FirstSequence)
  730. batchSize := len(batch.Records)
  731. if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append
  732. if lastBatchFirstSeq == batchFirstSeq { //is a batch retry
  733. if lastBatchSize == batchSize { //good retry
  734. // mock write to disk
  735. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  736. return prodSuccessResponse
  737. }
  738. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  739. return prodOutOfSeq
  740. } else { // not a retry
  741. // save batch just received for future check
  742. lastBatchFirstSeq = batchFirstSeq
  743. lastBatchSize = batchSize
  744. if prodCounter%2 == 1 {
  745. if test.failAfterWrite {
  746. // mock write to disk
  747. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  748. }
  749. return prodNotLeaderResponse
  750. }
  751. // mock write to disk
  752. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  753. return prodSuccessResponse
  754. }
  755. } else {
  756. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize == batchSize { // is a good batch retry
  757. if lastSequenceWrittenToDisk == (batchFirstSeq + batchSize - 1) { // we already have the messages
  758. return prodDuplicate
  759. }
  760. // mock write to disk
  761. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  762. return prodSuccessResponse
  763. } else { //out of sequence / bad retried batch
  764. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize {
  765. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  766. } else if lastSequenceWrittenToDisk+1 != batchFirstSeq {
  767. t.Errorf("[%s] Out of sequence message lastSequence=%d new batch starts at=%d", test.name, lastSequenceWrittenToDisk, batchFirstSeq)
  768. } else {
  769. t.Errorf("[%s] Unexpected error", test.name)
  770. }
  771. return prodOutOfSeq
  772. }
  773. }
  774. }
  775. return nil
  776. }
  777. config := NewConfig()
  778. config.Version = V0_11_0_0
  779. config.Producer.Idempotent = true
  780. config.Net.MaxOpenRequests = 1
  781. config.Producer.RequiredAcks = WaitForAll
  782. config.Producer.Return.Successes = true
  783. config.Producer.Flush.Frequency = 50 * time.Millisecond
  784. config.Producer.Retry.Backoff = 100 * time.Millisecond
  785. broker.setHandler(handlerFailBeforeWrite)
  786. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  787. if err != nil {
  788. t.Fatal(err)
  789. }
  790. for i := 0; i < 3; i++ {
  791. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  792. }
  793. go func() {
  794. for i := 0; i < 7; i++ {
  795. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")}
  796. time.Sleep(100 * time.Millisecond)
  797. }
  798. }()
  799. expectResults(t, producer, 10, 0)
  800. broker.Close()
  801. closeProducer(t, producer)
  802. }
  803. }
  804. func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
  805. broker := NewMockBroker(t, 1)
  806. metadataResponse := &MetadataResponse{
  807. Version: 1,
  808. ControllerID: 1,
  809. }
  810. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  811. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  812. broker.Returns(metadataResponse)
  813. initProducerID := &InitProducerIDResponse{
  814. ThrottleTime: 0,
  815. ProducerID: 1000,
  816. ProducerEpoch: 1,
  817. }
  818. broker.Returns(initProducerID)
  819. config := NewConfig()
  820. config.Producer.Flush.Messages = 10
  821. config.Producer.Return.Successes = true
  822. config.Producer.Retry.Max = 400000
  823. config.Producer.RequiredAcks = WaitForAll
  824. config.Producer.Retry.Backoff = 0
  825. config.Producer.Idempotent = true
  826. config.Net.MaxOpenRequests = 1
  827. config.Version = V0_11_0_0
  828. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  829. if err != nil {
  830. t.Fatal(err)
  831. }
  832. for i := 0; i < 10; i++ {
  833. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  834. }
  835. prodOutOfSeq := &ProduceResponse{
  836. Version: 3,
  837. ThrottleTime: 0,
  838. }
  839. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  840. broker.Returns(prodOutOfSeq)
  841. expectResults(t, producer, 0, 10)
  842. broker.Close()
  843. closeProducer(t, producer)
  844. }
  845. // This example shows how to use the producer while simultaneously
  846. // reading the Errors channel to know about any failures.
  847. func ExampleAsyncProducer_select() {
  848. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  849. if err != nil {
  850. panic(err)
  851. }
  852. defer func() {
  853. if err := producer.Close(); err != nil {
  854. log.Fatalln(err)
  855. }
  856. }()
  857. // Trap SIGINT to trigger a shutdown.
  858. signals := make(chan os.Signal, 1)
  859. signal.Notify(signals, os.Interrupt)
  860. var enqueued, errors int
  861. ProducerLoop:
  862. for {
  863. select {
  864. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  865. enqueued++
  866. case err := <-producer.Errors():
  867. log.Println("Failed to produce message", err)
  868. errors++
  869. case <-signals:
  870. break ProducerLoop
  871. }
  872. }
  873. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  874. }
  875. // This example shows how to use the producer with separate goroutines
  876. // reading from the Successes and Errors channels. Note that in order
  877. // for the Successes channel to be populated, you have to set
  878. // config.Producer.Return.Successes to true.
  879. func ExampleAsyncProducer_goroutines() {
  880. config := NewConfig()
  881. config.Producer.Return.Successes = true
  882. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  883. if err != nil {
  884. panic(err)
  885. }
  886. // Trap SIGINT to trigger a graceful shutdown.
  887. signals := make(chan os.Signal, 1)
  888. signal.Notify(signals, os.Interrupt)
  889. var (
  890. wg sync.WaitGroup
  891. enqueued, successes, errors int
  892. )
  893. wg.Add(1)
  894. go func() {
  895. defer wg.Done()
  896. for range producer.Successes() {
  897. successes++
  898. }
  899. }()
  900. wg.Add(1)
  901. go func() {
  902. defer wg.Done()
  903. for err := range producer.Errors() {
  904. log.Println(err)
  905. errors++
  906. }
  907. }()
  908. ProducerLoop:
  909. for {
  910. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  911. select {
  912. case producer.Input() <- message:
  913. enqueued++
  914. case <-signals:
  915. producer.AsyncClose() // Trigger a shutdown of the producer.
  916. break ProducerLoop
  917. }
  918. }
  919. wg.Wait()
  920. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  921. }