async_producer_test.go 31 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. const TestMessage = "ABC THE MESSAGE"
  12. func closeProducer(t *testing.T, p AsyncProducer) {
  13. var wg sync.WaitGroup
  14. p.AsyncClose()
  15. wg.Add(2)
  16. go func() {
  17. for range p.Successes() {
  18. t.Error("Unexpected message on Successes()")
  19. }
  20. wg.Done()
  21. }()
  22. go func() {
  23. for msg := range p.Errors() {
  24. t.Error(msg.Err)
  25. }
  26. wg.Done()
  27. }()
  28. wg.Wait()
  29. }
  30. func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
  31. expect := successes + errors
  32. for expect > 0 {
  33. select {
  34. case msg := <-p.Errors():
  35. if msg.Msg.flags != 0 {
  36. t.Error("Message had flags set")
  37. }
  38. errors--
  39. expect--
  40. if errors < 0 {
  41. t.Error(msg.Err)
  42. }
  43. case msg := <-p.Successes():
  44. if msg.flags != 0 {
  45. t.Error("Message had flags set")
  46. }
  47. successes--
  48. expect--
  49. if successes < 0 {
  50. t.Error("Too many successes")
  51. }
  52. }
  53. }
  54. if successes != 0 || errors != 0 {
  55. t.Error("Unexpected successes", successes, "or errors", errors)
  56. }
  57. }
  58. type testPartitioner chan *int32
  59. func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
  60. part := <-p
  61. if part == nil {
  62. return 0, errors.New("BOOM")
  63. }
  64. return *part, nil
  65. }
  66. func (p testPartitioner) RequiresConsistency() bool {
  67. return true
  68. }
  69. func (p testPartitioner) feed(partition int32) {
  70. p <- &partition
  71. }
  72. type flakyEncoder bool
  73. func (f flakyEncoder) Length() int {
  74. return len(TestMessage)
  75. }
  76. func (f flakyEncoder) Encode() ([]byte, error) {
  77. if !bool(f) {
  78. return nil, errors.New("flaky encoding error")
  79. }
  80. return []byte(TestMessage), nil
  81. }
  82. func TestAsyncProducer(t *testing.T) {
  83. seedBroker := NewMockBroker(t, 1)
  84. leader := NewMockBroker(t, 2)
  85. metadataResponse := new(MetadataResponse)
  86. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  87. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  88. seedBroker.Returns(metadataResponse)
  89. prodSuccess := new(ProduceResponse)
  90. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  91. leader.Returns(prodSuccess)
  92. config := NewConfig()
  93. config.Producer.Flush.Messages = 10
  94. config.Producer.Return.Successes = true
  95. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  96. if err != nil {
  97. t.Fatal(err)
  98. }
  99. for i := 0; i < 10; i++ {
  100. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  101. }
  102. for i := 0; i < 10; i++ {
  103. select {
  104. case msg := <-producer.Errors():
  105. t.Error(msg.Err)
  106. if msg.Msg.flags != 0 {
  107. t.Error("Message had flags set")
  108. }
  109. case msg := <-producer.Successes():
  110. if msg.flags != 0 {
  111. t.Error("Message had flags set")
  112. }
  113. if msg.Metadata.(int) != i {
  114. t.Error("Message metadata did not match")
  115. }
  116. case <-time.After(time.Second):
  117. t.Errorf("Timeout waiting for msg #%d", i)
  118. goto done
  119. }
  120. }
  121. done:
  122. closeProducer(t, producer)
  123. leader.Close()
  124. seedBroker.Close()
  125. }
  126. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  127. seedBroker := NewMockBroker(t, 1)
  128. leader := NewMockBroker(t, 2)
  129. metadataResponse := new(MetadataResponse)
  130. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  131. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  132. seedBroker.Returns(metadataResponse)
  133. prodSuccess := new(ProduceResponse)
  134. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  135. leader.Returns(prodSuccess)
  136. leader.Returns(prodSuccess)
  137. leader.Returns(prodSuccess)
  138. config := NewConfig()
  139. config.Producer.Flush.Messages = 5
  140. config.Producer.Return.Successes = true
  141. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. for flush := 0; flush < 3; flush++ {
  146. for i := 0; i < 5; i++ {
  147. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  148. }
  149. expectResults(t, producer, 5, 0)
  150. }
  151. closeProducer(t, producer)
  152. leader.Close()
  153. seedBroker.Close()
  154. }
  155. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  156. seedBroker := NewMockBroker(t, 1)
  157. leader0 := NewMockBroker(t, 2)
  158. leader1 := NewMockBroker(t, 3)
  159. metadataResponse := new(MetadataResponse)
  160. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  161. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  162. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  163. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  164. seedBroker.Returns(metadataResponse)
  165. prodResponse0 := new(ProduceResponse)
  166. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  167. leader0.Returns(prodResponse0)
  168. prodResponse1 := new(ProduceResponse)
  169. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  170. leader1.Returns(prodResponse1)
  171. config := NewConfig()
  172. config.Producer.Flush.Messages = 5
  173. config.Producer.Return.Successes = true
  174. config.Producer.Partitioner = NewRoundRobinPartitioner
  175. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  176. if err != nil {
  177. t.Fatal(err)
  178. }
  179. for i := 0; i < 10; i++ {
  180. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  181. }
  182. expectResults(t, producer, 10, 0)
  183. closeProducer(t, producer)
  184. leader1.Close()
  185. leader0.Close()
  186. seedBroker.Close()
  187. }
  188. func TestAsyncProducerCustomPartitioner(t *testing.T) {
  189. seedBroker := NewMockBroker(t, 1)
  190. leader := NewMockBroker(t, 2)
  191. metadataResponse := new(MetadataResponse)
  192. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  193. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  194. seedBroker.Returns(metadataResponse)
  195. prodResponse := new(ProduceResponse)
  196. prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  197. leader.Returns(prodResponse)
  198. config := NewConfig()
  199. config.Producer.Flush.Messages = 2
  200. config.Producer.Return.Successes = true
  201. config.Producer.Partitioner = func(topic string) Partitioner {
  202. p := make(testPartitioner)
  203. go func() {
  204. p.feed(0)
  205. p <- nil
  206. p <- nil
  207. p <- nil
  208. p.feed(0)
  209. }()
  210. return p
  211. }
  212. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  213. if err != nil {
  214. t.Fatal(err)
  215. }
  216. for i := 0; i < 5; i++ {
  217. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  218. }
  219. expectResults(t, producer, 2, 3)
  220. closeProducer(t, producer)
  221. leader.Close()
  222. seedBroker.Close()
  223. }
  224. func TestAsyncProducerFailureRetry(t *testing.T) {
  225. seedBroker := NewMockBroker(t, 1)
  226. leader1 := NewMockBroker(t, 2)
  227. leader2 := NewMockBroker(t, 3)
  228. metadataLeader1 := new(MetadataResponse)
  229. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  230. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  231. seedBroker.Returns(metadataLeader1)
  232. config := NewConfig()
  233. config.Producer.Flush.Messages = 10
  234. config.Producer.Return.Successes = true
  235. config.Producer.Retry.Backoff = 0
  236. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  237. if err != nil {
  238. t.Fatal(err)
  239. }
  240. seedBroker.Close()
  241. for i := 0; i < 10; i++ {
  242. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  243. }
  244. prodNotLeader := new(ProduceResponse)
  245. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  246. leader1.Returns(prodNotLeader)
  247. metadataLeader2 := new(MetadataResponse)
  248. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  249. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  250. leader1.Returns(metadataLeader2)
  251. prodSuccess := new(ProduceResponse)
  252. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  253. leader2.Returns(prodSuccess)
  254. expectResults(t, producer, 10, 0)
  255. leader1.Close()
  256. for i := 0; i < 10; i++ {
  257. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  258. }
  259. leader2.Returns(metadataLeader2)
  260. leader2.Returns(prodSuccess)
  261. expectResults(t, producer, 10, 0)
  262. leader2.Close()
  263. closeProducer(t, producer)
  264. }
  265. func TestAsyncProducerEncoderFailures(t *testing.T) {
  266. seedBroker := NewMockBroker(t, 1)
  267. leader := NewMockBroker(t, 2)
  268. metadataResponse := new(MetadataResponse)
  269. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  270. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  271. seedBroker.Returns(metadataResponse)
  272. prodSuccess := new(ProduceResponse)
  273. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  274. leader.Returns(prodSuccess)
  275. leader.Returns(prodSuccess)
  276. leader.Returns(prodSuccess)
  277. config := NewConfig()
  278. config.Producer.Flush.Messages = 1
  279. config.Producer.Return.Successes = true
  280. config.Producer.Partitioner = NewManualPartitioner
  281. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  282. if err != nil {
  283. t.Fatal(err)
  284. }
  285. for flush := 0; flush < 3; flush++ {
  286. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
  287. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
  288. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
  289. expectResults(t, producer, 1, 2)
  290. }
  291. closeProducer(t, producer)
  292. leader.Close()
  293. seedBroker.Close()
  294. }
  295. // If a Kafka broker becomes unavailable and then returns back in service, then
  296. // producer reconnects to it and continues sending messages.
  297. func TestAsyncProducerBrokerBounce(t *testing.T) {
  298. // Given
  299. seedBroker := NewMockBroker(t, 1)
  300. leader := NewMockBroker(t, 2)
  301. leaderAddr := leader.Addr()
  302. metadataResponse := new(MetadataResponse)
  303. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  304. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  305. seedBroker.Returns(metadataResponse)
  306. prodSuccess := new(ProduceResponse)
  307. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  308. config := NewConfig()
  309. config.Producer.Flush.Messages = 1
  310. config.Producer.Return.Successes = true
  311. config.Producer.Retry.Backoff = 0
  312. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  313. if err != nil {
  314. t.Fatal(err)
  315. }
  316. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  317. leader.Returns(prodSuccess)
  318. expectResults(t, producer, 1, 0)
  319. // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
  320. leader.Close() // producer should get EOF
  321. leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  322. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  323. // Then: a produced message goes through the new broker connection.
  324. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  325. leader.Returns(prodSuccess)
  326. expectResults(t, producer, 1, 0)
  327. closeProducer(t, producer)
  328. seedBroker.Close()
  329. leader.Close()
  330. }
  331. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  332. seedBroker := NewMockBroker(t, 1)
  333. leader1 := NewMockBroker(t, 2)
  334. leader2 := NewMockBroker(t, 3)
  335. metadataLeader1 := new(MetadataResponse)
  336. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  337. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  338. seedBroker.Returns(metadataLeader1)
  339. config := NewConfig()
  340. config.Producer.Flush.Messages = 10
  341. config.Producer.Return.Successes = true
  342. config.Producer.Retry.Max = 3
  343. config.Producer.Retry.Backoff = 0
  344. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  345. if err != nil {
  346. t.Fatal(err)
  347. }
  348. for i := 0; i < 10; i++ {
  349. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  350. }
  351. leader1.Close() // producer should get EOF
  352. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  353. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  354. // ok fine, tell it to go to leader2 finally
  355. metadataLeader2 := new(MetadataResponse)
  356. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  357. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  358. seedBroker.Returns(metadataLeader2)
  359. prodSuccess := new(ProduceResponse)
  360. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  361. leader2.Returns(prodSuccess)
  362. expectResults(t, producer, 10, 0)
  363. seedBroker.Close()
  364. leader2.Close()
  365. closeProducer(t, producer)
  366. }
  367. func TestAsyncProducerMultipleRetries(t *testing.T) {
  368. seedBroker := NewMockBroker(t, 1)
  369. leader1 := NewMockBroker(t, 2)
  370. leader2 := NewMockBroker(t, 3)
  371. metadataLeader1 := new(MetadataResponse)
  372. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  373. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  374. seedBroker.Returns(metadataLeader1)
  375. config := NewConfig()
  376. config.Producer.Flush.Messages = 10
  377. config.Producer.Return.Successes = true
  378. config.Producer.Retry.Max = 4
  379. config.Producer.Retry.Backoff = 0
  380. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  381. if err != nil {
  382. t.Fatal(err)
  383. }
  384. for i := 0; i < 10; i++ {
  385. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  386. }
  387. prodNotLeader := new(ProduceResponse)
  388. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  389. leader1.Returns(prodNotLeader)
  390. metadataLeader2 := new(MetadataResponse)
  391. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  392. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  393. seedBroker.Returns(metadataLeader2)
  394. leader2.Returns(prodNotLeader)
  395. seedBroker.Returns(metadataLeader1)
  396. leader1.Returns(prodNotLeader)
  397. seedBroker.Returns(metadataLeader1)
  398. leader1.Returns(prodNotLeader)
  399. seedBroker.Returns(metadataLeader2)
  400. seedBroker.Returns(metadataLeader2)
  401. prodSuccess := new(ProduceResponse)
  402. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  403. leader2.Returns(prodSuccess)
  404. expectResults(t, producer, 10, 0)
  405. for i := 0; i < 10; i++ {
  406. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  407. }
  408. leader2.Returns(prodSuccess)
  409. expectResults(t, producer, 10, 0)
  410. seedBroker.Close()
  411. leader1.Close()
  412. leader2.Close()
  413. closeProducer(t, producer)
  414. }
  415. func TestAsyncProducerOutOfRetries(t *testing.T) {
  416. t.Skip("Enable once bug #294 is fixed.")
  417. seedBroker := NewMockBroker(t, 1)
  418. leader := NewMockBroker(t, 2)
  419. metadataResponse := new(MetadataResponse)
  420. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  421. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  422. seedBroker.Returns(metadataResponse)
  423. config := NewConfig()
  424. config.Producer.Flush.Messages = 10
  425. config.Producer.Return.Successes = true
  426. config.Producer.Retry.Backoff = 0
  427. config.Producer.Retry.Max = 0
  428. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  429. if err != nil {
  430. t.Fatal(err)
  431. }
  432. for i := 0; i < 10; i++ {
  433. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  434. }
  435. prodNotLeader := new(ProduceResponse)
  436. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  437. leader.Returns(prodNotLeader)
  438. for i := 0; i < 10; i++ {
  439. select {
  440. case msg := <-producer.Errors():
  441. if msg.Err != ErrNotLeaderForPartition {
  442. t.Error(msg.Err)
  443. }
  444. case <-producer.Successes():
  445. t.Error("Unexpected success")
  446. }
  447. }
  448. seedBroker.Returns(metadataResponse)
  449. for i := 0; i < 10; i++ {
  450. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  451. }
  452. prodSuccess := new(ProduceResponse)
  453. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  454. leader.Returns(prodSuccess)
  455. expectResults(t, producer, 10, 0)
  456. leader.Close()
  457. seedBroker.Close()
  458. safeClose(t, producer)
  459. }
  460. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  461. seedBroker := NewMockBroker(t, 1)
  462. leader := NewMockBroker(t, 2)
  463. leaderAddr := leader.Addr()
  464. metadataResponse := new(MetadataResponse)
  465. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  466. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  467. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  468. seedBroker.Returns(metadataResponse)
  469. config := NewConfig()
  470. config.Producer.Return.Successes = true
  471. config.Producer.Retry.Backoff = 0
  472. config.Producer.Retry.Max = 1
  473. config.Producer.Partitioner = NewRoundRobinPartitioner
  474. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  475. if err != nil {
  476. t.Fatal(err)
  477. }
  478. // prime partition 0
  479. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  480. prodSuccess := new(ProduceResponse)
  481. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  482. leader.Returns(prodSuccess)
  483. expectResults(t, producer, 1, 0)
  484. // prime partition 1
  485. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  486. prodSuccess = new(ProduceResponse)
  487. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  488. leader.Returns(prodSuccess)
  489. expectResults(t, producer, 1, 0)
  490. // reboot the broker (the producer will get EOF on its existing connection)
  491. leader.Close()
  492. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  493. // send another message on partition 0 to trigger the EOF and retry
  494. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  495. // tell partition 0 to go to that broker again
  496. seedBroker.Returns(metadataResponse)
  497. // succeed this time
  498. prodSuccess = new(ProduceResponse)
  499. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  500. leader.Returns(prodSuccess)
  501. expectResults(t, producer, 1, 0)
  502. // shutdown
  503. closeProducer(t, producer)
  504. seedBroker.Close()
  505. leader.Close()
  506. }
  507. func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
  508. seedBroker := NewMockBroker(t, 1)
  509. leader := NewMockBroker(t, 2)
  510. metadataResponse := new(MetadataResponse)
  511. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  512. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  513. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  514. seedBroker.Returns(metadataResponse)
  515. config := NewConfig()
  516. config.Producer.Flush.Messages = 5
  517. config.Producer.Return.Successes = true
  518. config.Producer.Retry.Backoff = 0
  519. config.Producer.Retry.Max = 1
  520. config.Producer.Partitioner = NewManualPartitioner
  521. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  522. if err != nil {
  523. t.Fatal(err)
  524. }
  525. // prime partitions
  526. for p := int32(0); p < 2; p++ {
  527. for i := 0; i < 5; i++ {
  528. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
  529. }
  530. prodSuccess := new(ProduceResponse)
  531. prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
  532. leader.Returns(prodSuccess)
  533. expectResults(t, producer, 5, 0)
  534. }
  535. // send more messages on partition 0
  536. for i := 0; i < 5; i++ {
  537. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  538. }
  539. prodNotLeader := new(ProduceResponse)
  540. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  541. leader.Returns(prodNotLeader)
  542. time.Sleep(50 * time.Millisecond)
  543. leader.SetHandlerByMap(map[string]MockResponse{
  544. "ProduceRequest": NewMockProduceResponse(t).
  545. SetVersion(0).
  546. SetError("my_topic", 0, ErrNoError),
  547. })
  548. // tell partition 0 to go to that broker again
  549. seedBroker.Returns(metadataResponse)
  550. // succeed this time
  551. expectResults(t, producer, 5, 0)
  552. seedBroker.Returns(metadataResponse)
  553. // put five more through
  554. for i := 0; i < 5; i++ {
  555. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  556. }
  557. expectResults(t, producer, 5, 0)
  558. // shutdown
  559. closeProducer(t, producer)
  560. seedBroker.Close()
  561. leader.Close()
  562. }
  563. func TestAsyncProducerRetryShutdown(t *testing.T) {
  564. seedBroker := NewMockBroker(t, 1)
  565. leader := NewMockBroker(t, 2)
  566. metadataLeader := new(MetadataResponse)
  567. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  568. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  569. seedBroker.Returns(metadataLeader)
  570. config := NewConfig()
  571. config.Producer.Flush.Messages = 10
  572. config.Producer.Return.Successes = true
  573. config.Producer.Retry.Backoff = 0
  574. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  575. if err != nil {
  576. t.Fatal(err)
  577. }
  578. for i := 0; i < 10; i++ {
  579. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  580. }
  581. producer.AsyncClose()
  582. time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
  583. producer.Input() <- &ProducerMessage{Topic: "FOO"}
  584. if err := <-producer.Errors(); err.Err != ErrShuttingDown {
  585. t.Error(err)
  586. }
  587. prodNotLeader := new(ProduceResponse)
  588. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  589. leader.Returns(prodNotLeader)
  590. seedBroker.Returns(metadataLeader)
  591. prodSuccess := new(ProduceResponse)
  592. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  593. leader.Returns(prodSuccess)
  594. expectResults(t, producer, 10, 0)
  595. seedBroker.Close()
  596. leader.Close()
  597. // wait for the async-closed producer to shut down fully
  598. for err := range producer.Errors() {
  599. t.Error(err)
  600. }
  601. }
  602. func TestAsyncProducerNoReturns(t *testing.T) {
  603. seedBroker := NewMockBroker(t, 1)
  604. leader := NewMockBroker(t, 2)
  605. metadataLeader := new(MetadataResponse)
  606. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  607. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  608. seedBroker.Returns(metadataLeader)
  609. config := NewConfig()
  610. config.Producer.Flush.Messages = 10
  611. config.Producer.Return.Successes = false
  612. config.Producer.Return.Errors = false
  613. config.Producer.Retry.Backoff = 0
  614. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  615. if err != nil {
  616. t.Fatal(err)
  617. }
  618. for i := 0; i < 10; i++ {
  619. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  620. }
  621. wait := make(chan bool)
  622. go func() {
  623. if err := producer.Close(); err != nil {
  624. t.Error(err)
  625. }
  626. close(wait)
  627. }()
  628. prodSuccess := new(ProduceResponse)
  629. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  630. leader.Returns(prodSuccess)
  631. <-wait
  632. seedBroker.Close()
  633. leader.Close()
  634. }
  635. func TestAsyncProducerIdempotentGoldenPath(t *testing.T) {
  636. broker := NewMockBroker(t, 1)
  637. metadataResponse := &MetadataResponse{
  638. Version: 1,
  639. ControllerID: 1,
  640. }
  641. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  642. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  643. broker.Returns(metadataResponse)
  644. initProducerID := &InitProducerIDResponse{
  645. ThrottleTime: 0,
  646. ProducerID: 1000,
  647. ProducerEpoch: 1,
  648. }
  649. broker.Returns(initProducerID)
  650. config := NewConfig()
  651. config.Producer.Flush.Messages = 10
  652. config.Producer.Return.Successes = true
  653. config.Producer.Retry.Max = 4
  654. config.Producer.RequiredAcks = WaitForAll
  655. config.Producer.Retry.Backoff = 0
  656. config.Producer.Idempotent = true
  657. config.Net.MaxOpenRequests = 1
  658. config.Version = V0_11_0_0
  659. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  660. if err != nil {
  661. t.Fatal(err)
  662. }
  663. for i := 0; i < 10; i++ {
  664. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  665. }
  666. prodSuccess := &ProduceResponse{
  667. Version: 3,
  668. ThrottleTime: 0,
  669. }
  670. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  671. broker.Returns(prodSuccess)
  672. expectResults(t, producer, 10, 0)
  673. broker.Close()
  674. closeProducer(t, producer)
  675. }
  676. func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
  677. //Logger = log.New(os.Stderr, "", log.LstdFlags)
  678. tests := []struct {
  679. name string
  680. failAfterWrite bool
  681. }{
  682. {"FailAfterWrite", true},
  683. {"FailBeforeWrite", false},
  684. }
  685. for _, test := range tests {
  686. broker := NewMockBroker(t, 1)
  687. metadataResponse := &MetadataResponse{
  688. Version: 1,
  689. ControllerID: 1,
  690. }
  691. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  692. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  693. initProducerIDResponse := &InitProducerIDResponse{
  694. ThrottleTime: 0,
  695. ProducerID: 1000,
  696. ProducerEpoch: 1,
  697. }
  698. prodNotLeaderResponse := &ProduceResponse{
  699. Version: 3,
  700. ThrottleTime: 0,
  701. }
  702. prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
  703. prodDuplicate := &ProduceResponse{
  704. Version: 3,
  705. ThrottleTime: 0,
  706. }
  707. prodDuplicate.AddTopicPartition("my_topic", 0, ErrDuplicateSequenceNumber)
  708. prodOutOfSeq := &ProduceResponse{
  709. Version: 3,
  710. ThrottleTime: 0,
  711. }
  712. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  713. prodSuccessResponse := &ProduceResponse{
  714. Version: 3,
  715. ThrottleTime: 0,
  716. }
  717. prodSuccessResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  718. prodCounter := 0
  719. lastBatchFirstSeq := -1
  720. lastBatchSize := -1
  721. lastSequenceWrittenToDisk := -1
  722. handlerFailBeforeWrite := func(req *request) (res encoder) {
  723. switch req.body.key() {
  724. case 3:
  725. return metadataResponse
  726. case 22:
  727. return initProducerIDResponse
  728. case 0:
  729. prodCounter++
  730. preq := req.body.(*ProduceRequest)
  731. batch := preq.records["my_topic"][0].RecordBatch
  732. batchFirstSeq := int(batch.FirstSequence)
  733. batchSize := len(batch.Records)
  734. if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append
  735. if lastBatchFirstSeq == batchFirstSeq { //is a batch retry
  736. if lastBatchSize == batchSize { //good retry
  737. // mock write to disk
  738. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  739. return prodSuccessResponse
  740. }
  741. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  742. return prodOutOfSeq
  743. } else { // not a retry
  744. // save batch just received for future check
  745. lastBatchFirstSeq = batchFirstSeq
  746. lastBatchSize = batchSize
  747. if prodCounter%2 == 1 {
  748. if test.failAfterWrite {
  749. // mock write to disk
  750. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  751. }
  752. return prodNotLeaderResponse
  753. }
  754. // mock write to disk
  755. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  756. return prodSuccessResponse
  757. }
  758. } else {
  759. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize == batchSize { // is a good batch retry
  760. if lastSequenceWrittenToDisk == (batchFirstSeq + batchSize - 1) { // we already have the messages
  761. return prodDuplicate
  762. }
  763. // mock write to disk
  764. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  765. return prodSuccessResponse
  766. } else { //out of sequence / bad retried batch
  767. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize {
  768. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  769. } else if lastSequenceWrittenToDisk+1 != batchFirstSeq {
  770. t.Errorf("[%s] Out of sequence message lastSequence=%d new batch starts at=%d", test.name, lastSequenceWrittenToDisk, batchFirstSeq)
  771. } else {
  772. t.Errorf("[%s] Unexpected error", test.name)
  773. }
  774. return prodOutOfSeq
  775. }
  776. }
  777. }
  778. return nil
  779. }
  780. config := NewConfig()
  781. config.Version = V0_11_0_0
  782. config.Producer.Idempotent = true
  783. config.Net.MaxOpenRequests = 1
  784. config.Producer.RequiredAcks = WaitForAll
  785. config.Producer.Return.Successes = true
  786. config.Producer.Flush.Frequency = 50 * time.Millisecond
  787. config.Producer.Retry.Backoff = 100 * time.Millisecond
  788. broker.setHandler(handlerFailBeforeWrite)
  789. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  790. if err != nil {
  791. t.Fatal(err)
  792. }
  793. for i := 0; i < 3; i++ {
  794. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  795. }
  796. go func() {
  797. for i := 0; i < 7; i++ {
  798. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")}
  799. time.Sleep(100 * time.Millisecond)
  800. }
  801. }()
  802. expectResults(t, producer, 10, 0)
  803. broker.Close()
  804. closeProducer(t, producer)
  805. }
  806. }
  807. func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
  808. broker := NewMockBroker(t, 1)
  809. metadataResponse := &MetadataResponse{
  810. Version: 1,
  811. ControllerID: 1,
  812. }
  813. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  814. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  815. broker.Returns(metadataResponse)
  816. initProducerID := &InitProducerIDResponse{
  817. ThrottleTime: 0,
  818. ProducerID: 1000,
  819. ProducerEpoch: 1,
  820. }
  821. broker.Returns(initProducerID)
  822. config := NewConfig()
  823. config.Producer.Flush.Messages = 10
  824. config.Producer.Return.Successes = true
  825. config.Producer.Retry.Max = 400000
  826. config.Producer.RequiredAcks = WaitForAll
  827. config.Producer.Retry.Backoff = 0
  828. config.Producer.Idempotent = true
  829. config.Net.MaxOpenRequests = 1
  830. config.Version = V0_11_0_0
  831. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  832. if err != nil {
  833. t.Fatal(err)
  834. }
  835. for i := 0; i < 10; i++ {
  836. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  837. }
  838. prodOutOfSeq := &ProduceResponse{
  839. Version: 3,
  840. ThrottleTime: 0,
  841. }
  842. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  843. broker.Returns(prodOutOfSeq)
  844. expectResults(t, producer, 0, 10)
  845. broker.Close()
  846. closeProducer(t, producer)
  847. }
  848. // This example shows how to use the producer while simultaneously
  849. // reading the Errors channel to know about any failures.
  850. func ExampleAsyncProducer_select() {
  851. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  852. if err != nil {
  853. panic(err)
  854. }
  855. defer func() {
  856. if err := producer.Close(); err != nil {
  857. log.Fatalln(err)
  858. }
  859. }()
  860. // Trap SIGINT to trigger a shutdown.
  861. signals := make(chan os.Signal, 1)
  862. signal.Notify(signals, os.Interrupt)
  863. var enqueued, errors int
  864. ProducerLoop:
  865. for {
  866. select {
  867. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  868. enqueued++
  869. case err := <-producer.Errors():
  870. log.Println("Failed to produce message", err)
  871. errors++
  872. case <-signals:
  873. break ProducerLoop
  874. }
  875. }
  876. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  877. }
  878. // This example shows how to use the producer with separate goroutines
  879. // reading from the Successes and Errors channels. Note that in order
  880. // for the Successes channel to be populated, you have to set
  881. // config.Producer.Return.Successes to true.
  882. func ExampleAsyncProducer_goroutines() {
  883. config := NewConfig()
  884. config.Producer.Return.Successes = true
  885. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  886. if err != nil {
  887. panic(err)
  888. }
  889. // Trap SIGINT to trigger a graceful shutdown.
  890. signals := make(chan os.Signal, 1)
  891. signal.Notify(signals, os.Interrupt)
  892. var (
  893. wg sync.WaitGroup
  894. enqueued, successes, errors int
  895. )
  896. wg.Add(1)
  897. go func() {
  898. defer wg.Done()
  899. for range producer.Successes() {
  900. successes++
  901. }
  902. }()
  903. wg.Add(1)
  904. go func() {
  905. defer wg.Done()
  906. for err := range producer.Errors() {
  907. log.Println(err)
  908. errors++
  909. }
  910. }()
  911. ProducerLoop:
  912. for {
  913. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  914. select {
  915. case producer.Input() <- message:
  916. enqueued++
  917. case <-signals:
  918. producer.AsyncClose() // Trigger a shutdown of the producer.
  919. break ProducerLoop
  920. }
  921. }
  922. wg.Wait()
  923. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  924. }