async_producer_test.go 33 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. const TestMessage = "ABC THE MESSAGE"
  12. func closeProducer(t *testing.T, p AsyncProducer) {
  13. var wg sync.WaitGroup
  14. p.AsyncClose()
  15. wg.Add(2)
  16. go func() {
  17. for range p.Successes() {
  18. t.Error("Unexpected message on Successes()")
  19. }
  20. wg.Done()
  21. }()
  22. go func() {
  23. for msg := range p.Errors() {
  24. t.Error(msg.Err)
  25. }
  26. wg.Done()
  27. }()
  28. wg.Wait()
  29. }
  30. func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
  31. expect := successes + errors
  32. for expect > 0 {
  33. select {
  34. case msg := <-p.Errors():
  35. if msg.Msg.flags != 0 {
  36. t.Error("Message had flags set")
  37. }
  38. errors--
  39. expect--
  40. if errors < 0 {
  41. t.Error(msg.Err)
  42. }
  43. case msg := <-p.Successes():
  44. if msg.flags != 0 {
  45. t.Error("Message had flags set")
  46. }
  47. successes--
  48. expect--
  49. if successes < 0 {
  50. t.Error("Too many successes")
  51. }
  52. }
  53. }
  54. if successes != 0 || errors != 0 {
  55. t.Error("Unexpected successes", successes, "or errors", errors)
  56. }
  57. }
  58. type testPartitioner chan *int32
  59. func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
  60. part := <-p
  61. if part == nil {
  62. return 0, errors.New("BOOM")
  63. }
  64. return *part, nil
  65. }
  66. func (p testPartitioner) RequiresConsistency() bool {
  67. return true
  68. }
  69. func (p testPartitioner) feed(partition int32) {
  70. p <- &partition
  71. }
  72. type flakyEncoder bool
  73. func (f flakyEncoder) Length() int {
  74. return len(TestMessage)
  75. }
  76. func (f flakyEncoder) Encode() ([]byte, error) {
  77. if !bool(f) {
  78. return nil, errors.New("flaky encoding error")
  79. }
  80. return []byte(TestMessage), nil
  81. }
  82. func TestAsyncProducer(t *testing.T) {
  83. seedBroker := NewMockBroker(t, 1)
  84. leader := NewMockBroker(t, 2)
  85. metadataResponse := new(MetadataResponse)
  86. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  87. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  88. seedBroker.Returns(metadataResponse)
  89. prodSuccess := new(ProduceResponse)
  90. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  91. leader.Returns(prodSuccess)
  92. config := NewConfig()
  93. config.Producer.Flush.Messages = 10
  94. config.Producer.Return.Successes = true
  95. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  96. if err != nil {
  97. t.Fatal(err)
  98. }
  99. for i := 0; i < 10; i++ {
  100. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  101. }
  102. for i := 0; i < 10; i++ {
  103. select {
  104. case msg := <-producer.Errors():
  105. t.Error(msg.Err)
  106. if msg.Msg.flags != 0 {
  107. t.Error("Message had flags set")
  108. }
  109. case msg := <-producer.Successes():
  110. if msg.flags != 0 {
  111. t.Error("Message had flags set")
  112. }
  113. if msg.Metadata.(int) != i {
  114. t.Error("Message metadata did not match")
  115. }
  116. case <-time.After(time.Second):
  117. t.Errorf("Timeout waiting for msg #%d", i)
  118. goto done
  119. }
  120. }
  121. done:
  122. closeProducer(t, producer)
  123. leader.Close()
  124. seedBroker.Close()
  125. }
  126. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  127. seedBroker := NewMockBroker(t, 1)
  128. leader := NewMockBroker(t, 2)
  129. metadataResponse := new(MetadataResponse)
  130. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  131. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  132. seedBroker.Returns(metadataResponse)
  133. prodSuccess := new(ProduceResponse)
  134. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  135. leader.Returns(prodSuccess)
  136. leader.Returns(prodSuccess)
  137. leader.Returns(prodSuccess)
  138. config := NewConfig()
  139. config.Producer.Flush.Messages = 5
  140. config.Producer.Return.Successes = true
  141. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. for flush := 0; flush < 3; flush++ {
  146. for i := 0; i < 5; i++ {
  147. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  148. }
  149. expectResults(t, producer, 5, 0)
  150. }
  151. closeProducer(t, producer)
  152. leader.Close()
  153. seedBroker.Close()
  154. }
  155. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  156. seedBroker := NewMockBroker(t, 1)
  157. leader0 := NewMockBroker(t, 2)
  158. leader1 := NewMockBroker(t, 3)
  159. metadataResponse := new(MetadataResponse)
  160. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  161. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  162. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  163. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  164. seedBroker.Returns(metadataResponse)
  165. prodResponse0 := new(ProduceResponse)
  166. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  167. leader0.Returns(prodResponse0)
  168. prodResponse1 := new(ProduceResponse)
  169. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  170. leader1.Returns(prodResponse1)
  171. config := NewConfig()
  172. config.Producer.Flush.Messages = 5
  173. config.Producer.Return.Successes = true
  174. config.Producer.Partitioner = NewRoundRobinPartitioner
  175. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  176. if err != nil {
  177. t.Fatal(err)
  178. }
  179. for i := 0; i < 10; i++ {
  180. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  181. }
  182. expectResults(t, producer, 10, 0)
  183. closeProducer(t, producer)
  184. leader1.Close()
  185. leader0.Close()
  186. seedBroker.Close()
  187. }
  188. func TestAsyncProducerCustomPartitioner(t *testing.T) {
  189. seedBroker := NewMockBroker(t, 1)
  190. leader := NewMockBroker(t, 2)
  191. metadataResponse := new(MetadataResponse)
  192. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  193. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  194. seedBroker.Returns(metadataResponse)
  195. prodResponse := new(ProduceResponse)
  196. prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  197. leader.Returns(prodResponse)
  198. config := NewConfig()
  199. config.Producer.Flush.Messages = 2
  200. config.Producer.Return.Successes = true
  201. config.Producer.Partitioner = func(topic string) Partitioner {
  202. p := make(testPartitioner)
  203. go func() {
  204. p.feed(0)
  205. p <- nil
  206. p <- nil
  207. p <- nil
  208. p.feed(0)
  209. }()
  210. return p
  211. }
  212. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  213. if err != nil {
  214. t.Fatal(err)
  215. }
  216. for i := 0; i < 5; i++ {
  217. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  218. }
  219. expectResults(t, producer, 2, 3)
  220. closeProducer(t, producer)
  221. leader.Close()
  222. seedBroker.Close()
  223. }
  224. func TestAsyncProducerFailureRetry(t *testing.T) {
  225. seedBroker := NewMockBroker(t, 1)
  226. leader1 := NewMockBroker(t, 2)
  227. leader2 := NewMockBroker(t, 3)
  228. metadataLeader1 := new(MetadataResponse)
  229. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  230. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  231. seedBroker.Returns(metadataLeader1)
  232. config := NewConfig()
  233. config.Producer.Flush.Messages = 10
  234. config.Producer.Return.Successes = true
  235. config.Producer.Retry.Backoff = 0
  236. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  237. if err != nil {
  238. t.Fatal(err)
  239. }
  240. seedBroker.Close()
  241. for i := 0; i < 10; i++ {
  242. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  243. }
  244. prodNotLeader := new(ProduceResponse)
  245. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  246. leader1.Returns(prodNotLeader)
  247. metadataLeader2 := new(MetadataResponse)
  248. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  249. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  250. leader1.Returns(metadataLeader2)
  251. prodSuccess := new(ProduceResponse)
  252. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  253. leader2.Returns(prodSuccess)
  254. expectResults(t, producer, 10, 0)
  255. leader1.Close()
  256. for i := 0; i < 10; i++ {
  257. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  258. }
  259. leader2.Returns(metadataLeader2)
  260. leader2.Returns(prodSuccess)
  261. expectResults(t, producer, 10, 0)
  262. leader2.Close()
  263. closeProducer(t, producer)
  264. }
  265. func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
  266. seedBroker := NewMockBroker(t, 1)
  267. leader1 := NewMockBroker(t, 2)
  268. leader2 := NewMockBroker(t, 3)
  269. metadataLeader1 := new(MetadataResponse)
  270. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  271. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  272. metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  273. seedBroker.Returns(metadataLeader1)
  274. config := NewConfig()
  275. config.Producer.Flush.Messages = 2
  276. config.Producer.Return.Successes = true
  277. config.Producer.Retry.Max = 0 // disable!
  278. config.Producer.Retry.Backoff = 0
  279. config.Producer.Partitioner = NewManualPartitioner
  280. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  281. if err != nil {
  282. t.Fatal(err)
  283. }
  284. seedBroker.Close()
  285. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  286. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  287. prodNotLeader := new(ProduceResponse)
  288. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  289. prodNotLeader.AddTopicPartition("my_topic", 1, ErrNotLeaderForPartition)
  290. leader1.Returns(prodNotLeader)
  291. expectResults(t, producer, 0, 2)
  292. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  293. metadataLeader2 := new(MetadataResponse)
  294. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  295. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  296. metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, ErrNoError)
  297. leader1.Returns(metadataLeader2)
  298. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  299. leader1.Returns(metadataLeader2)
  300. prodSuccess := new(ProduceResponse)
  301. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  302. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  303. leader2.Returns(prodSuccess)
  304. expectResults(t, producer, 2, 0)
  305. leader1.Close()
  306. leader2.Close()
  307. closeProducer(t, producer)
  308. }
  309. func TestAsyncProducerEncoderFailures(t *testing.T) {
  310. seedBroker := NewMockBroker(t, 1)
  311. leader := NewMockBroker(t, 2)
  312. metadataResponse := new(MetadataResponse)
  313. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  314. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  315. seedBroker.Returns(metadataResponse)
  316. prodSuccess := new(ProduceResponse)
  317. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  318. leader.Returns(prodSuccess)
  319. leader.Returns(prodSuccess)
  320. leader.Returns(prodSuccess)
  321. config := NewConfig()
  322. config.Producer.Flush.Messages = 1
  323. config.Producer.Return.Successes = true
  324. config.Producer.Partitioner = NewManualPartitioner
  325. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  326. if err != nil {
  327. t.Fatal(err)
  328. }
  329. for flush := 0; flush < 3; flush++ {
  330. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
  331. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
  332. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
  333. expectResults(t, producer, 1, 2)
  334. }
  335. closeProducer(t, producer)
  336. leader.Close()
  337. seedBroker.Close()
  338. }
  339. // If a Kafka broker becomes unavailable and then returns back in service, then
  340. // producer reconnects to it and continues sending messages.
  341. func TestAsyncProducerBrokerBounce(t *testing.T) {
  342. // Given
  343. seedBroker := NewMockBroker(t, 1)
  344. leader := NewMockBroker(t, 2)
  345. leaderAddr := leader.Addr()
  346. metadataResponse := new(MetadataResponse)
  347. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  348. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  349. seedBroker.Returns(metadataResponse)
  350. prodSuccess := new(ProduceResponse)
  351. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  352. config := NewConfig()
  353. config.Producer.Flush.Messages = 1
  354. config.Producer.Return.Successes = true
  355. config.Producer.Retry.Backoff = 0
  356. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  357. if err != nil {
  358. t.Fatal(err)
  359. }
  360. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  361. leader.Returns(prodSuccess)
  362. expectResults(t, producer, 1, 0)
  363. // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
  364. leader.Close() // producer should get EOF
  365. leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  366. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  367. // Then: a produced message goes through the new broker connection.
  368. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  369. leader.Returns(prodSuccess)
  370. expectResults(t, producer, 1, 0)
  371. closeProducer(t, producer)
  372. seedBroker.Close()
  373. leader.Close()
  374. }
  375. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  376. seedBroker := NewMockBroker(t, 1)
  377. leader1 := NewMockBroker(t, 2)
  378. leader2 := NewMockBroker(t, 3)
  379. metadataLeader1 := new(MetadataResponse)
  380. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  381. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  382. seedBroker.Returns(metadataLeader1)
  383. config := NewConfig()
  384. config.Producer.Flush.Messages = 10
  385. config.Producer.Return.Successes = true
  386. config.Producer.Retry.Max = 3
  387. config.Producer.Retry.Backoff = 0
  388. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  389. if err != nil {
  390. t.Fatal(err)
  391. }
  392. for i := 0; i < 10; i++ {
  393. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  394. }
  395. leader1.Close() // producer should get EOF
  396. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  397. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  398. // ok fine, tell it to go to leader2 finally
  399. metadataLeader2 := new(MetadataResponse)
  400. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  401. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  402. seedBroker.Returns(metadataLeader2)
  403. prodSuccess := new(ProduceResponse)
  404. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  405. leader2.Returns(prodSuccess)
  406. expectResults(t, producer, 10, 0)
  407. seedBroker.Close()
  408. leader2.Close()
  409. closeProducer(t, producer)
  410. }
  411. func TestAsyncProducerMultipleRetries(t *testing.T) {
  412. seedBroker := NewMockBroker(t, 1)
  413. leader1 := NewMockBroker(t, 2)
  414. leader2 := NewMockBroker(t, 3)
  415. metadataLeader1 := new(MetadataResponse)
  416. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  417. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  418. seedBroker.Returns(metadataLeader1)
  419. config := NewConfig()
  420. config.Producer.Flush.Messages = 10
  421. config.Producer.Return.Successes = true
  422. config.Producer.Retry.Max = 4
  423. config.Producer.Retry.Backoff = 0
  424. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  425. if err != nil {
  426. t.Fatal(err)
  427. }
  428. for i := 0; i < 10; i++ {
  429. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  430. }
  431. prodNotLeader := new(ProduceResponse)
  432. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  433. leader1.Returns(prodNotLeader)
  434. metadataLeader2 := new(MetadataResponse)
  435. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  436. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  437. seedBroker.Returns(metadataLeader2)
  438. leader2.Returns(prodNotLeader)
  439. seedBroker.Returns(metadataLeader1)
  440. leader1.Returns(prodNotLeader)
  441. seedBroker.Returns(metadataLeader1)
  442. leader1.Returns(prodNotLeader)
  443. seedBroker.Returns(metadataLeader2)
  444. seedBroker.Returns(metadataLeader2)
  445. prodSuccess := new(ProduceResponse)
  446. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  447. leader2.Returns(prodSuccess)
  448. expectResults(t, producer, 10, 0)
  449. for i := 0; i < 10; i++ {
  450. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  451. }
  452. leader2.Returns(prodSuccess)
  453. expectResults(t, producer, 10, 0)
  454. seedBroker.Close()
  455. leader1.Close()
  456. leader2.Close()
  457. closeProducer(t, producer)
  458. }
  459. func TestAsyncProducerOutOfRetries(t *testing.T) {
  460. t.Skip("Enable once bug #294 is fixed.")
  461. seedBroker := NewMockBroker(t, 1)
  462. leader := NewMockBroker(t, 2)
  463. metadataResponse := new(MetadataResponse)
  464. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  465. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  466. seedBroker.Returns(metadataResponse)
  467. config := NewConfig()
  468. config.Producer.Flush.Messages = 10
  469. config.Producer.Return.Successes = true
  470. config.Producer.Retry.Backoff = 0
  471. config.Producer.Retry.Max = 0
  472. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  473. if err != nil {
  474. t.Fatal(err)
  475. }
  476. for i := 0; i < 10; i++ {
  477. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  478. }
  479. prodNotLeader := new(ProduceResponse)
  480. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  481. leader.Returns(prodNotLeader)
  482. for i := 0; i < 10; i++ {
  483. select {
  484. case msg := <-producer.Errors():
  485. if msg.Err != ErrNotLeaderForPartition {
  486. t.Error(msg.Err)
  487. }
  488. case <-producer.Successes():
  489. t.Error("Unexpected success")
  490. }
  491. }
  492. seedBroker.Returns(metadataResponse)
  493. for i := 0; i < 10; i++ {
  494. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  495. }
  496. prodSuccess := new(ProduceResponse)
  497. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  498. leader.Returns(prodSuccess)
  499. expectResults(t, producer, 10, 0)
  500. leader.Close()
  501. seedBroker.Close()
  502. safeClose(t, producer)
  503. }
  504. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  505. seedBroker := NewMockBroker(t, 1)
  506. leader := NewMockBroker(t, 2)
  507. leaderAddr := leader.Addr()
  508. metadataResponse := new(MetadataResponse)
  509. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  510. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  511. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  512. seedBroker.Returns(metadataResponse)
  513. config := NewConfig()
  514. config.Producer.Return.Successes = true
  515. config.Producer.Retry.Backoff = 0
  516. config.Producer.Retry.Max = 1
  517. config.Producer.Partitioner = NewRoundRobinPartitioner
  518. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  519. if err != nil {
  520. t.Fatal(err)
  521. }
  522. // prime partition 0
  523. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  524. prodSuccess := new(ProduceResponse)
  525. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  526. leader.Returns(prodSuccess)
  527. expectResults(t, producer, 1, 0)
  528. // prime partition 1
  529. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  530. prodSuccess = new(ProduceResponse)
  531. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  532. leader.Returns(prodSuccess)
  533. expectResults(t, producer, 1, 0)
  534. // reboot the broker (the producer will get EOF on its existing connection)
  535. leader.Close()
  536. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  537. // send another message on partition 0 to trigger the EOF and retry
  538. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  539. // tell partition 0 to go to that broker again
  540. seedBroker.Returns(metadataResponse)
  541. // succeed this time
  542. prodSuccess = new(ProduceResponse)
  543. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  544. leader.Returns(prodSuccess)
  545. expectResults(t, producer, 1, 0)
  546. // shutdown
  547. closeProducer(t, producer)
  548. seedBroker.Close()
  549. leader.Close()
  550. }
  551. func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
  552. seedBroker := NewMockBroker(t, 1)
  553. leader := NewMockBroker(t, 2)
  554. metadataResponse := new(MetadataResponse)
  555. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  556. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  557. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
  558. seedBroker.Returns(metadataResponse)
  559. config := NewConfig()
  560. config.Producer.Flush.Messages = 5
  561. config.Producer.Return.Successes = true
  562. config.Producer.Retry.Backoff = 0
  563. config.Producer.Retry.Max = 1
  564. config.Producer.Partitioner = NewManualPartitioner
  565. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  566. if err != nil {
  567. t.Fatal(err)
  568. }
  569. // prime partitions
  570. for p := int32(0); p < 2; p++ {
  571. for i := 0; i < 5; i++ {
  572. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
  573. }
  574. prodSuccess := new(ProduceResponse)
  575. prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
  576. leader.Returns(prodSuccess)
  577. expectResults(t, producer, 5, 0)
  578. }
  579. // send more messages on partition 0
  580. for i := 0; i < 5; i++ {
  581. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  582. }
  583. prodNotLeader := new(ProduceResponse)
  584. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  585. leader.Returns(prodNotLeader)
  586. time.Sleep(50 * time.Millisecond)
  587. leader.SetHandlerByMap(map[string]MockResponse{
  588. "ProduceRequest": NewMockProduceResponse(t).
  589. SetVersion(0).
  590. SetError("my_topic", 0, ErrNoError),
  591. })
  592. // tell partition 0 to go to that broker again
  593. seedBroker.Returns(metadataResponse)
  594. // succeed this time
  595. expectResults(t, producer, 5, 0)
  596. seedBroker.Returns(metadataResponse)
  597. // put five more through
  598. for i := 0; i < 5; i++ {
  599. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  600. }
  601. expectResults(t, producer, 5, 0)
  602. // shutdown
  603. closeProducer(t, producer)
  604. seedBroker.Close()
  605. leader.Close()
  606. }
  607. func TestAsyncProducerRetryShutdown(t *testing.T) {
  608. seedBroker := NewMockBroker(t, 1)
  609. leader := NewMockBroker(t, 2)
  610. metadataLeader := new(MetadataResponse)
  611. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  612. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  613. seedBroker.Returns(metadataLeader)
  614. config := NewConfig()
  615. config.Producer.Flush.Messages = 10
  616. config.Producer.Return.Successes = true
  617. config.Producer.Retry.Backoff = 0
  618. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  619. if err != nil {
  620. t.Fatal(err)
  621. }
  622. for i := 0; i < 10; i++ {
  623. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  624. }
  625. producer.AsyncClose()
  626. time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
  627. producer.Input() <- &ProducerMessage{Topic: "FOO"}
  628. if err := <-producer.Errors(); err.Err != ErrShuttingDown {
  629. t.Error(err)
  630. }
  631. prodNotLeader := new(ProduceResponse)
  632. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  633. leader.Returns(prodNotLeader)
  634. seedBroker.Returns(metadataLeader)
  635. prodSuccess := new(ProduceResponse)
  636. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  637. leader.Returns(prodSuccess)
  638. expectResults(t, producer, 10, 0)
  639. seedBroker.Close()
  640. leader.Close()
  641. // wait for the async-closed producer to shut down fully
  642. for err := range producer.Errors() {
  643. t.Error(err)
  644. }
  645. }
  646. func TestAsyncProducerNoReturns(t *testing.T) {
  647. seedBroker := NewMockBroker(t, 1)
  648. leader := NewMockBroker(t, 2)
  649. metadataLeader := new(MetadataResponse)
  650. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  651. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  652. seedBroker.Returns(metadataLeader)
  653. config := NewConfig()
  654. config.Producer.Flush.Messages = 10
  655. config.Producer.Return.Successes = false
  656. config.Producer.Return.Errors = false
  657. config.Producer.Retry.Backoff = 0
  658. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  659. if err != nil {
  660. t.Fatal(err)
  661. }
  662. for i := 0; i < 10; i++ {
  663. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  664. }
  665. wait := make(chan bool)
  666. go func() {
  667. if err := producer.Close(); err != nil {
  668. t.Error(err)
  669. }
  670. close(wait)
  671. }()
  672. prodSuccess := new(ProduceResponse)
  673. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  674. leader.Returns(prodSuccess)
  675. <-wait
  676. seedBroker.Close()
  677. leader.Close()
  678. }
  679. func TestAsyncProducerIdempotentGoldenPath(t *testing.T) {
  680. broker := NewMockBroker(t, 1)
  681. metadataResponse := &MetadataResponse{
  682. Version: 1,
  683. ControllerID: 1,
  684. }
  685. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  686. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  687. broker.Returns(metadataResponse)
  688. initProducerID := &InitProducerIDResponse{
  689. ThrottleTime: 0,
  690. ProducerID: 1000,
  691. ProducerEpoch: 1,
  692. }
  693. broker.Returns(initProducerID)
  694. config := NewConfig()
  695. config.Producer.Flush.Messages = 10
  696. config.Producer.Return.Successes = true
  697. config.Producer.Retry.Max = 4
  698. config.Producer.RequiredAcks = WaitForAll
  699. config.Producer.Retry.Backoff = 0
  700. config.Producer.Idempotent = true
  701. config.Net.MaxOpenRequests = 1
  702. config.Version = V0_11_0_0
  703. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  704. if err != nil {
  705. t.Fatal(err)
  706. }
  707. for i := 0; i < 10; i++ {
  708. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  709. }
  710. prodSuccess := &ProduceResponse{
  711. Version: 3,
  712. ThrottleTime: 0,
  713. }
  714. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  715. broker.Returns(prodSuccess)
  716. expectResults(t, producer, 10, 0)
  717. broker.Close()
  718. closeProducer(t, producer)
  719. }
  720. func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
  721. //Logger = log.New(os.Stderr, "", log.LstdFlags)
  722. tests := []struct {
  723. name string
  724. failAfterWrite bool
  725. }{
  726. {"FailAfterWrite", true},
  727. {"FailBeforeWrite", false},
  728. }
  729. for _, test := range tests {
  730. broker := NewMockBroker(t, 1)
  731. metadataResponse := &MetadataResponse{
  732. Version: 1,
  733. ControllerID: 1,
  734. }
  735. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  736. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  737. initProducerIDResponse := &InitProducerIDResponse{
  738. ThrottleTime: 0,
  739. ProducerID: 1000,
  740. ProducerEpoch: 1,
  741. }
  742. prodNotLeaderResponse := &ProduceResponse{
  743. Version: 3,
  744. ThrottleTime: 0,
  745. }
  746. prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
  747. prodDuplicate := &ProduceResponse{
  748. Version: 3,
  749. ThrottleTime: 0,
  750. }
  751. prodDuplicate.AddTopicPartition("my_topic", 0, ErrDuplicateSequenceNumber)
  752. prodOutOfSeq := &ProduceResponse{
  753. Version: 3,
  754. ThrottleTime: 0,
  755. }
  756. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  757. prodSuccessResponse := &ProduceResponse{
  758. Version: 3,
  759. ThrottleTime: 0,
  760. }
  761. prodSuccessResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  762. prodCounter := 0
  763. lastBatchFirstSeq := -1
  764. lastBatchSize := -1
  765. lastSequenceWrittenToDisk := -1
  766. handlerFailBeforeWrite := func(req *request) (res encoder) {
  767. switch req.body.key() {
  768. case 3:
  769. return metadataResponse
  770. case 22:
  771. return initProducerIDResponse
  772. case 0:
  773. prodCounter++
  774. preq := req.body.(*ProduceRequest)
  775. batch := preq.records["my_topic"][0].RecordBatch
  776. batchFirstSeq := int(batch.FirstSequence)
  777. batchSize := len(batch.Records)
  778. if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append
  779. if lastBatchFirstSeq == batchFirstSeq { //is a batch retry
  780. if lastBatchSize == batchSize { //good retry
  781. // mock write to disk
  782. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  783. return prodSuccessResponse
  784. }
  785. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  786. return prodOutOfSeq
  787. } else { // not a retry
  788. // save batch just received for future check
  789. lastBatchFirstSeq = batchFirstSeq
  790. lastBatchSize = batchSize
  791. if prodCounter%2 == 1 {
  792. if test.failAfterWrite {
  793. // mock write to disk
  794. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  795. }
  796. return prodNotLeaderResponse
  797. }
  798. // mock write to disk
  799. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  800. return prodSuccessResponse
  801. }
  802. } else {
  803. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize == batchSize { // is a good batch retry
  804. if lastSequenceWrittenToDisk == (batchFirstSeq + batchSize - 1) { // we already have the messages
  805. return prodDuplicate
  806. }
  807. // mock write to disk
  808. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  809. return prodSuccessResponse
  810. } else { //out of sequence / bad retried batch
  811. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize {
  812. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  813. } else if lastSequenceWrittenToDisk+1 != batchFirstSeq {
  814. t.Errorf("[%s] Out of sequence message lastSequence=%d new batch starts at=%d", test.name, lastSequenceWrittenToDisk, batchFirstSeq)
  815. } else {
  816. t.Errorf("[%s] Unexpected error", test.name)
  817. }
  818. return prodOutOfSeq
  819. }
  820. }
  821. }
  822. return nil
  823. }
  824. config := NewConfig()
  825. config.Version = V0_11_0_0
  826. config.Producer.Idempotent = true
  827. config.Net.MaxOpenRequests = 1
  828. config.Producer.RequiredAcks = WaitForAll
  829. config.Producer.Return.Successes = true
  830. config.Producer.Flush.Frequency = 50 * time.Millisecond
  831. config.Producer.Retry.Backoff = 100 * time.Millisecond
  832. broker.setHandler(handlerFailBeforeWrite)
  833. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  834. if err != nil {
  835. t.Fatal(err)
  836. }
  837. for i := 0; i < 3; i++ {
  838. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  839. }
  840. go func() {
  841. for i := 0; i < 7; i++ {
  842. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")}
  843. time.Sleep(100 * time.Millisecond)
  844. }
  845. }()
  846. expectResults(t, producer, 10, 0)
  847. broker.Close()
  848. closeProducer(t, producer)
  849. }
  850. }
  851. func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
  852. broker := NewMockBroker(t, 1)
  853. metadataResponse := &MetadataResponse{
  854. Version: 1,
  855. ControllerID: 1,
  856. }
  857. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  858. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
  859. broker.Returns(metadataResponse)
  860. initProducerID := &InitProducerIDResponse{
  861. ThrottleTime: 0,
  862. ProducerID: 1000,
  863. ProducerEpoch: 1,
  864. }
  865. broker.Returns(initProducerID)
  866. config := NewConfig()
  867. config.Producer.Flush.Messages = 10
  868. config.Producer.Return.Successes = true
  869. config.Producer.Retry.Max = 400000
  870. config.Producer.RequiredAcks = WaitForAll
  871. config.Producer.Retry.Backoff = 0
  872. config.Producer.Idempotent = true
  873. config.Net.MaxOpenRequests = 1
  874. config.Version = V0_11_0_0
  875. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  876. if err != nil {
  877. t.Fatal(err)
  878. }
  879. for i := 0; i < 10; i++ {
  880. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  881. }
  882. prodOutOfSeq := &ProduceResponse{
  883. Version: 3,
  884. ThrottleTime: 0,
  885. }
  886. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  887. broker.Returns(prodOutOfSeq)
  888. expectResults(t, producer, 0, 10)
  889. broker.Close()
  890. closeProducer(t, producer)
  891. }
  892. // This example shows how to use the producer while simultaneously
  893. // reading the Errors channel to know about any failures.
  894. func ExampleAsyncProducer_select() {
  895. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  896. if err != nil {
  897. panic(err)
  898. }
  899. defer func() {
  900. if err := producer.Close(); err != nil {
  901. log.Fatalln(err)
  902. }
  903. }()
  904. // Trap SIGINT to trigger a shutdown.
  905. signals := make(chan os.Signal, 1)
  906. signal.Notify(signals, os.Interrupt)
  907. var enqueued, errors int
  908. ProducerLoop:
  909. for {
  910. select {
  911. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  912. enqueued++
  913. case err := <-producer.Errors():
  914. log.Println("Failed to produce message", err)
  915. errors++
  916. case <-signals:
  917. break ProducerLoop
  918. }
  919. }
  920. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  921. }
  922. // This example shows how to use the producer with separate goroutines
  923. // reading from the Successes and Errors channels. Note that in order
  924. // for the Successes channel to be populated, you have to set
  925. // config.Producer.Return.Successes to true.
  926. func ExampleAsyncProducer_goroutines() {
  927. config := NewConfig()
  928. config.Producer.Return.Successes = true
  929. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  930. if err != nil {
  931. panic(err)
  932. }
  933. // Trap SIGINT to trigger a graceful shutdown.
  934. signals := make(chan os.Signal, 1)
  935. signal.Notify(signals, os.Interrupt)
  936. var (
  937. wg sync.WaitGroup
  938. enqueued, successes, errors int
  939. )
  940. wg.Add(1)
  941. go func() {
  942. defer wg.Done()
  943. for range producer.Successes() {
  944. successes++
  945. }
  946. }()
  947. wg.Add(1)
  948. go func() {
  949. defer wg.Done()
  950. for err := range producer.Errors() {
  951. log.Println(err)
  952. errors++
  953. }
  954. }()
  955. ProducerLoop:
  956. for {
  957. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  958. select {
  959. case producer.Input() <- message:
  960. enqueued++
  961. case <-signals:
  962. producer.AsyncClose() // Trigger a shutdown of the producer.
  963. break ProducerLoop
  964. }
  965. }
  966. wg.Wait()
  967. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  968. }