async_producer_test.go 43 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "strconv"
  8. "sync"
  9. "sync/atomic"
  10. "testing"
  11. "time"
  12. "github.com/fortytw2/leaktest"
  13. "github.com/rcrowley/go-metrics"
  14. )
  15. const TestMessage = "ABC THE MESSAGE"
  16. func closeProducer(t *testing.T, p AsyncProducer) {
  17. var wg sync.WaitGroup
  18. p.AsyncClose()
  19. wg.Add(2)
  20. go func() {
  21. for range p.Successes() {
  22. t.Error("Unexpected message on Successes()")
  23. }
  24. wg.Done()
  25. }()
  26. go func() {
  27. for msg := range p.Errors() {
  28. t.Error(msg.Err)
  29. }
  30. wg.Done()
  31. }()
  32. wg.Wait()
  33. }
  34. func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
  35. expect := successes + errors
  36. for expect > 0 {
  37. select {
  38. case msg := <-p.Errors():
  39. if msg.Msg.flags != 0 {
  40. t.Error("Message had flags set")
  41. }
  42. errors--
  43. expect--
  44. if errors < 0 {
  45. t.Error(msg.Err)
  46. }
  47. case msg := <-p.Successes():
  48. if msg.flags != 0 {
  49. t.Error("Message had flags set")
  50. }
  51. successes--
  52. expect--
  53. if successes < 0 {
  54. t.Error("Too many successes")
  55. }
  56. }
  57. }
  58. if successes != 0 || errors != 0 {
  59. t.Error("Unexpected successes", successes, "or errors", errors)
  60. }
  61. }
  62. type testPartitioner chan *int32
  63. func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
  64. part := <-p
  65. if part == nil {
  66. return 0, errors.New("BOOM")
  67. }
  68. return *part, nil
  69. }
  70. func (p testPartitioner) RequiresConsistency() bool {
  71. return true
  72. }
  73. func (p testPartitioner) feed(partition int32) {
  74. p <- &partition
  75. }
  76. type flakyEncoder bool
  77. func (f flakyEncoder) Length() int {
  78. return len(TestMessage)
  79. }
  80. func (f flakyEncoder) Encode() ([]byte, error) {
  81. if !f {
  82. return nil, errors.New("flaky encoding error")
  83. }
  84. return []byte(TestMessage), nil
  85. }
  86. func TestAsyncProducer(t *testing.T) {
  87. seedBroker := NewMockBroker(t, 1)
  88. leader := NewMockBroker(t, 2)
  89. metadataResponse := new(MetadataResponse)
  90. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  91. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  92. seedBroker.Returns(metadataResponse)
  93. prodSuccess := new(ProduceResponse)
  94. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  95. leader.Returns(prodSuccess)
  96. config := NewConfig()
  97. config.Producer.Flush.Messages = 10
  98. config.Producer.Return.Successes = true
  99. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  100. if err != nil {
  101. t.Fatal(err)
  102. }
  103. for i := 0; i < 10; i++ {
  104. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  105. }
  106. for i := 0; i < 10; i++ {
  107. select {
  108. case msg := <-producer.Errors():
  109. t.Error(msg.Err)
  110. if msg.Msg.flags != 0 {
  111. t.Error("Message had flags set")
  112. }
  113. case msg := <-producer.Successes():
  114. if msg.flags != 0 {
  115. t.Error("Message had flags set")
  116. }
  117. if msg.Metadata.(int) != i {
  118. t.Error("Message metadata did not match")
  119. }
  120. case <-time.After(time.Second):
  121. t.Errorf("Timeout waiting for msg #%d", i)
  122. goto done
  123. }
  124. }
  125. done:
  126. closeProducer(t, producer)
  127. leader.Close()
  128. seedBroker.Close()
  129. }
  130. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  131. seedBroker := NewMockBroker(t, 1)
  132. leader := NewMockBroker(t, 2)
  133. metadataResponse := new(MetadataResponse)
  134. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  135. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  136. seedBroker.Returns(metadataResponse)
  137. prodSuccess := new(ProduceResponse)
  138. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  139. leader.Returns(prodSuccess)
  140. leader.Returns(prodSuccess)
  141. leader.Returns(prodSuccess)
  142. config := NewConfig()
  143. config.Producer.Flush.Messages = 5
  144. config.Producer.Return.Successes = true
  145. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  146. if err != nil {
  147. t.Fatal(err)
  148. }
  149. for flush := 0; flush < 3; flush++ {
  150. for i := 0; i < 5; i++ {
  151. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  152. }
  153. expectResults(t, producer, 5, 0)
  154. }
  155. closeProducer(t, producer)
  156. leader.Close()
  157. seedBroker.Close()
  158. }
  159. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  160. seedBroker := NewMockBroker(t, 1)
  161. leader0 := NewMockBroker(t, 2)
  162. leader1 := NewMockBroker(t, 3)
  163. metadataResponse := new(MetadataResponse)
  164. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  165. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  166. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, nil, ErrNoError)
  167. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  168. seedBroker.Returns(metadataResponse)
  169. prodResponse0 := new(ProduceResponse)
  170. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  171. leader0.Returns(prodResponse0)
  172. prodResponse1 := new(ProduceResponse)
  173. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  174. leader1.Returns(prodResponse1)
  175. config := NewConfig()
  176. config.Producer.Flush.Messages = 5
  177. config.Producer.Return.Successes = true
  178. config.Producer.Partitioner = NewRoundRobinPartitioner
  179. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  180. if err != nil {
  181. t.Fatal(err)
  182. }
  183. for i := 0; i < 10; i++ {
  184. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  185. }
  186. expectResults(t, producer, 10, 0)
  187. closeProducer(t, producer)
  188. leader1.Close()
  189. leader0.Close()
  190. seedBroker.Close()
  191. }
  192. func TestAsyncProducerCustomPartitioner(t *testing.T) {
  193. seedBroker := NewMockBroker(t, 1)
  194. leader := NewMockBroker(t, 2)
  195. metadataResponse := new(MetadataResponse)
  196. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  197. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  198. seedBroker.Returns(metadataResponse)
  199. prodResponse := new(ProduceResponse)
  200. prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  201. leader.Returns(prodResponse)
  202. config := NewConfig()
  203. config.Producer.Flush.Messages = 2
  204. config.Producer.Return.Successes = true
  205. config.Producer.Partitioner = func(topic string) Partitioner {
  206. p := make(testPartitioner)
  207. go func() {
  208. p.feed(0)
  209. p <- nil
  210. p <- nil
  211. p <- nil
  212. p.feed(0)
  213. }()
  214. return p
  215. }
  216. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  217. if err != nil {
  218. t.Fatal(err)
  219. }
  220. for i := 0; i < 5; i++ {
  221. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  222. }
  223. expectResults(t, producer, 2, 3)
  224. closeProducer(t, producer)
  225. leader.Close()
  226. seedBroker.Close()
  227. }
  228. func TestAsyncProducerFailureRetry(t *testing.T) {
  229. seedBroker := NewMockBroker(t, 1)
  230. leader1 := NewMockBroker(t, 2)
  231. leader2 := NewMockBroker(t, 3)
  232. metadataLeader1 := new(MetadataResponse)
  233. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  234. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  235. seedBroker.Returns(metadataLeader1)
  236. config := NewConfig()
  237. config.Producer.Flush.Messages = 10
  238. config.Producer.Return.Successes = true
  239. config.Producer.Retry.Backoff = 0
  240. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  241. if err != nil {
  242. t.Fatal(err)
  243. }
  244. seedBroker.Close()
  245. for i := 0; i < 10; i++ {
  246. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  247. }
  248. prodNotLeader := new(ProduceResponse)
  249. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  250. leader1.Returns(prodNotLeader)
  251. metadataLeader2 := new(MetadataResponse)
  252. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  253. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  254. leader1.Returns(metadataLeader2)
  255. prodSuccess := new(ProduceResponse)
  256. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  257. leader2.Returns(prodSuccess)
  258. expectResults(t, producer, 10, 0)
  259. leader1.Close()
  260. for i := 0; i < 10; i++ {
  261. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  262. }
  263. leader2.Returns(prodSuccess)
  264. expectResults(t, producer, 10, 0)
  265. leader2.Close()
  266. closeProducer(t, producer)
  267. }
  268. func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
  269. tt := func(t *testing.T, kErr KError) {
  270. seedBroker := NewMockBroker(t, 1)
  271. leader1 := NewMockBroker(t, 2)
  272. leader2 := NewMockBroker(t, 3)
  273. metadataLeader1 := new(MetadataResponse)
  274. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  275. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  276. metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  277. seedBroker.Returns(metadataLeader1)
  278. config := NewConfig()
  279. config.Producer.Flush.Messages = 2
  280. config.Producer.Return.Successes = true
  281. config.Producer.Retry.Max = 0 // disable!
  282. config.Producer.Retry.Backoff = 0
  283. config.Producer.Partitioner = NewManualPartitioner
  284. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  285. if err != nil {
  286. t.Fatal(err)
  287. }
  288. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  289. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  290. prodNotLeader := new(ProduceResponse)
  291. prodNotLeader.AddTopicPartition("my_topic", 0, kErr)
  292. prodNotLeader.AddTopicPartition("my_topic", 1, kErr)
  293. leader1.Returns(prodNotLeader)
  294. expectResults(t, producer, 0, 2)
  295. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  296. metadataLeader2 := new(MetadataResponse)
  297. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  298. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  299. metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  300. leader1.Returns(metadataLeader2)
  301. leader1.Returns(metadataLeader2)
  302. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  303. prodSuccess := new(ProduceResponse)
  304. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  305. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  306. leader2.Returns(prodSuccess)
  307. expectResults(t, producer, 2, 0)
  308. seedBroker.Close()
  309. leader1.Close()
  310. leader2.Close()
  311. closeProducer(t, producer)
  312. }
  313. t.Run("retriable error", func(t *testing.T) {
  314. tt(t, ErrNotLeaderForPartition)
  315. })
  316. t.Run("non-retriable error", func(t *testing.T) {
  317. tt(t, ErrNotController)
  318. })
  319. }
  320. func TestAsyncProducerEncoderFailures(t *testing.T) {
  321. seedBroker := NewMockBroker(t, 1)
  322. leader := NewMockBroker(t, 2)
  323. metadataResponse := new(MetadataResponse)
  324. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  325. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  326. seedBroker.Returns(metadataResponse)
  327. prodSuccess := new(ProduceResponse)
  328. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  329. leader.Returns(prodSuccess)
  330. leader.Returns(prodSuccess)
  331. leader.Returns(prodSuccess)
  332. config := NewConfig()
  333. config.Producer.Flush.Messages = 1
  334. config.Producer.Return.Successes = true
  335. config.Producer.Partitioner = NewManualPartitioner
  336. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  337. if err != nil {
  338. t.Fatal(err)
  339. }
  340. for flush := 0; flush < 3; flush++ {
  341. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
  342. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
  343. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
  344. expectResults(t, producer, 1, 2)
  345. }
  346. closeProducer(t, producer)
  347. leader.Close()
  348. seedBroker.Close()
  349. }
  350. // If a Kafka broker becomes unavailable and then returns back in service, then
  351. // producer reconnects to it and continues sending messages.
  352. func TestAsyncProducerBrokerBounce(t *testing.T) {
  353. // Given
  354. seedBroker := NewMockBroker(t, 1)
  355. leader := NewMockBroker(t, 2)
  356. leaderAddr := leader.Addr()
  357. metadataResponse := new(MetadataResponse)
  358. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  359. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  360. seedBroker.Returns(metadataResponse)
  361. prodSuccess := new(ProduceResponse)
  362. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  363. config := NewConfig()
  364. config.Producer.Flush.Messages = 1
  365. config.Producer.Return.Successes = true
  366. config.Producer.Retry.Backoff = 0
  367. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  368. if err != nil {
  369. t.Fatal(err)
  370. }
  371. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  372. leader.Returns(prodSuccess)
  373. expectResults(t, producer, 1, 0)
  374. // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
  375. leader.Close() // producer should get EOF
  376. leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  377. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  378. // Then: a produced message goes through the new broker connection.
  379. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  380. leader.Returns(prodSuccess)
  381. expectResults(t, producer, 1, 0)
  382. closeProducer(t, producer)
  383. seedBroker.Close()
  384. leader.Close()
  385. }
  386. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  387. seedBroker := NewMockBroker(t, 1)
  388. leader1 := NewMockBroker(t, 2)
  389. leader2 := NewMockBroker(t, 3)
  390. metadataLeader1 := new(MetadataResponse)
  391. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  392. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  393. seedBroker.Returns(metadataLeader1)
  394. config := NewConfig()
  395. config.Producer.Flush.Messages = 10
  396. config.Producer.Return.Successes = true
  397. config.Producer.Retry.Max = 3
  398. config.Producer.Retry.Backoff = 0
  399. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  400. if err != nil {
  401. t.Fatal(err)
  402. }
  403. for i := 0; i < 10; i++ {
  404. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  405. }
  406. leader1.Close() // producer should get EOF
  407. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  408. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  409. // ok fine, tell it to go to leader2 finally
  410. metadataLeader2 := new(MetadataResponse)
  411. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  412. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  413. seedBroker.Returns(metadataLeader2)
  414. prodSuccess := new(ProduceResponse)
  415. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  416. leader2.Returns(prodSuccess)
  417. expectResults(t, producer, 10, 0)
  418. seedBroker.Close()
  419. leader2.Close()
  420. closeProducer(t, producer)
  421. }
  422. func TestAsyncProducerMultipleRetries(t *testing.T) {
  423. seedBroker := NewMockBroker(t, 1)
  424. leader1 := NewMockBroker(t, 2)
  425. leader2 := NewMockBroker(t, 3)
  426. metadataLeader1 := new(MetadataResponse)
  427. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  428. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  429. seedBroker.Returns(metadataLeader1)
  430. config := NewConfig()
  431. config.Producer.Flush.Messages = 10
  432. config.Producer.Return.Successes = true
  433. config.Producer.Retry.Max = 4
  434. config.Producer.Retry.Backoff = 0
  435. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  436. if err != nil {
  437. t.Fatal(err)
  438. }
  439. for i := 0; i < 10; i++ {
  440. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  441. }
  442. prodNotLeader := new(ProduceResponse)
  443. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  444. leader1.Returns(prodNotLeader)
  445. metadataLeader2 := new(MetadataResponse)
  446. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  447. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  448. seedBroker.Returns(metadataLeader2)
  449. leader2.Returns(prodNotLeader)
  450. seedBroker.Returns(metadataLeader1)
  451. leader1.Returns(prodNotLeader)
  452. seedBroker.Returns(metadataLeader1)
  453. leader1.Returns(prodNotLeader)
  454. seedBroker.Returns(metadataLeader2)
  455. prodSuccess := new(ProduceResponse)
  456. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  457. leader2.Returns(prodSuccess)
  458. expectResults(t, producer, 10, 0)
  459. for i := 0; i < 10; i++ {
  460. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  461. }
  462. leader2.Returns(prodSuccess)
  463. expectResults(t, producer, 10, 0)
  464. seedBroker.Close()
  465. leader1.Close()
  466. leader2.Close()
  467. closeProducer(t, producer)
  468. }
  469. func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
  470. seedBroker := NewMockBroker(t, 1)
  471. leader1 := NewMockBroker(t, 2)
  472. leader2 := NewMockBroker(t, 3)
  473. metadataLeader1 := new(MetadataResponse)
  474. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  475. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  476. seedBroker.Returns(metadataLeader1)
  477. config := NewConfig()
  478. config.Producer.Flush.Messages = 1
  479. config.Producer.Return.Successes = true
  480. config.Producer.Retry.Max = 4
  481. backoffCalled := make([]int32, config.Producer.Retry.Max+1)
  482. config.Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
  483. atomic.AddInt32(&backoffCalled[retries-1], 1)
  484. return 0
  485. }
  486. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  487. if err != nil {
  488. t.Fatal(err)
  489. }
  490. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  491. prodNotLeader := new(ProduceResponse)
  492. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  493. prodSuccess := new(ProduceResponse)
  494. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  495. metadataLeader2 := new(MetadataResponse)
  496. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  497. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  498. leader1.Returns(prodNotLeader)
  499. seedBroker.Returns(metadataLeader2)
  500. leader2.Returns(prodNotLeader)
  501. seedBroker.Returns(metadataLeader1)
  502. leader1.Returns(prodNotLeader)
  503. seedBroker.Returns(metadataLeader1)
  504. leader1.Returns(prodNotLeader)
  505. seedBroker.Returns(metadataLeader2)
  506. leader2.Returns(prodSuccess)
  507. expectResults(t, producer, 1, 0)
  508. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  509. leader2.Returns(prodSuccess)
  510. expectResults(t, producer, 1, 0)
  511. seedBroker.Close()
  512. leader1.Close()
  513. leader2.Close()
  514. closeProducer(t, producer)
  515. for i := 0; i < config.Producer.Retry.Max; i++ {
  516. if atomic.LoadInt32(&backoffCalled[i]) != 1 {
  517. t.Errorf("expected one retry attempt #%d", i)
  518. }
  519. }
  520. if atomic.LoadInt32(&backoffCalled[config.Producer.Retry.Max]) != 0 {
  521. t.Errorf("expected no retry attempt #%d", config.Producer.Retry.Max)
  522. }
  523. }
  524. func TestAsyncProducerOutOfRetries(t *testing.T) {
  525. t.Skip("Enable once bug #294 is fixed.")
  526. seedBroker := NewMockBroker(t, 1)
  527. leader := NewMockBroker(t, 2)
  528. metadataResponse := new(MetadataResponse)
  529. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  530. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  531. seedBroker.Returns(metadataResponse)
  532. config := NewConfig()
  533. config.Producer.Flush.Messages = 10
  534. config.Producer.Return.Successes = true
  535. config.Producer.Retry.Backoff = 0
  536. config.Producer.Retry.Max = 0
  537. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  538. if err != nil {
  539. t.Fatal(err)
  540. }
  541. for i := 0; i < 10; i++ {
  542. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  543. }
  544. prodNotLeader := new(ProduceResponse)
  545. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  546. leader.Returns(prodNotLeader)
  547. for i := 0; i < 10; i++ {
  548. select {
  549. case msg := <-producer.Errors():
  550. if msg.Err != ErrNotLeaderForPartition {
  551. t.Error(msg.Err)
  552. }
  553. case <-producer.Successes():
  554. t.Error("Unexpected success")
  555. }
  556. }
  557. seedBroker.Returns(metadataResponse)
  558. for i := 0; i < 10; i++ {
  559. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  560. }
  561. prodSuccess := new(ProduceResponse)
  562. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  563. leader.Returns(prodSuccess)
  564. expectResults(t, producer, 10, 0)
  565. leader.Close()
  566. seedBroker.Close()
  567. safeClose(t, producer)
  568. }
  569. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  570. seedBroker := NewMockBroker(t, 1)
  571. leader := NewMockBroker(t, 2)
  572. leaderAddr := leader.Addr()
  573. metadataResponse := new(MetadataResponse)
  574. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  575. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  576. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError)
  577. seedBroker.Returns(metadataResponse)
  578. config := NewConfig()
  579. config.Producer.Return.Successes = true
  580. config.Producer.Retry.Backoff = 0
  581. config.Producer.Retry.Max = 1
  582. config.Producer.Partitioner = NewRoundRobinPartitioner
  583. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  584. if err != nil {
  585. t.Fatal(err)
  586. }
  587. // prime partition 0
  588. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  589. prodSuccess := new(ProduceResponse)
  590. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  591. leader.Returns(prodSuccess)
  592. expectResults(t, producer, 1, 0)
  593. // prime partition 1
  594. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  595. prodSuccess = new(ProduceResponse)
  596. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  597. leader.Returns(prodSuccess)
  598. expectResults(t, producer, 1, 0)
  599. // reboot the broker (the producer will get EOF on its existing connection)
  600. leader.Close()
  601. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  602. // send another message on partition 0 to trigger the EOF and retry
  603. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  604. // tell partition 0 to go to that broker again
  605. seedBroker.Returns(metadataResponse)
  606. // succeed this time
  607. prodSuccess = new(ProduceResponse)
  608. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  609. leader.Returns(prodSuccess)
  610. expectResults(t, producer, 1, 0)
  611. // shutdown
  612. closeProducer(t, producer)
  613. seedBroker.Close()
  614. leader.Close()
  615. }
  616. func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
  617. seedBroker := NewMockBroker(t, 1)
  618. leader := NewMockBroker(t, 2)
  619. metadataResponse := new(MetadataResponse)
  620. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  621. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  622. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError)
  623. seedBroker.Returns(metadataResponse)
  624. config := NewConfig()
  625. config.Producer.Flush.Messages = 5
  626. config.Producer.Return.Successes = true
  627. config.Producer.Retry.Backoff = 0
  628. config.Producer.Retry.Max = 1
  629. config.Producer.Partitioner = NewManualPartitioner
  630. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  631. if err != nil {
  632. t.Fatal(err)
  633. }
  634. // prime partitions
  635. for p := int32(0); p < 2; p++ {
  636. for i := 0; i < 5; i++ {
  637. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
  638. }
  639. prodSuccess := new(ProduceResponse)
  640. prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
  641. leader.Returns(prodSuccess)
  642. expectResults(t, producer, 5, 0)
  643. }
  644. // send more messages on partition 0
  645. for i := 0; i < 5; i++ {
  646. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  647. }
  648. prodNotLeader := new(ProduceResponse)
  649. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  650. leader.Returns(prodNotLeader)
  651. time.Sleep(50 * time.Millisecond)
  652. leader.SetHandlerByMap(map[string]MockResponse{
  653. "ProduceRequest": NewMockProduceResponse(t).
  654. SetVersion(0).
  655. SetError("my_topic", 0, ErrNoError),
  656. })
  657. // tell partition 0 to go to that broker again
  658. seedBroker.Returns(metadataResponse)
  659. // succeed this time
  660. expectResults(t, producer, 5, 0)
  661. // put five more through
  662. for i := 0; i < 5; i++ {
  663. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  664. }
  665. expectResults(t, producer, 5, 0)
  666. // shutdown
  667. closeProducer(t, producer)
  668. seedBroker.Close()
  669. leader.Close()
  670. }
  671. func TestAsyncProducerRetryShutdown(t *testing.T) {
  672. seedBroker := NewMockBroker(t, 1)
  673. leader := NewMockBroker(t, 2)
  674. metadataLeader := new(MetadataResponse)
  675. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  676. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  677. seedBroker.Returns(metadataLeader)
  678. config := NewConfig()
  679. config.Producer.Flush.Messages = 10
  680. config.Producer.Return.Successes = true
  681. config.Producer.Retry.Backoff = 0
  682. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  683. if err != nil {
  684. t.Fatal(err)
  685. }
  686. for i := 0; i < 10; i++ {
  687. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  688. }
  689. producer.AsyncClose()
  690. time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
  691. producer.Input() <- &ProducerMessage{Topic: "FOO"}
  692. if err := <-producer.Errors(); err.Err != ErrShuttingDown {
  693. t.Error(err)
  694. }
  695. prodNotLeader := new(ProduceResponse)
  696. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  697. leader.Returns(prodNotLeader)
  698. seedBroker.Returns(metadataLeader)
  699. prodSuccess := new(ProduceResponse)
  700. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  701. leader.Returns(prodSuccess)
  702. expectResults(t, producer, 10, 0)
  703. seedBroker.Close()
  704. leader.Close()
  705. // wait for the async-closed producer to shut down fully
  706. for err := range producer.Errors() {
  707. t.Error(err)
  708. }
  709. }
  710. func TestAsyncProducerNoReturns(t *testing.T) {
  711. seedBroker := NewMockBroker(t, 1)
  712. leader := NewMockBroker(t, 2)
  713. metadataLeader := new(MetadataResponse)
  714. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  715. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  716. seedBroker.Returns(metadataLeader)
  717. config := NewConfig()
  718. config.Producer.Flush.Messages = 10
  719. config.Producer.Return.Successes = false
  720. config.Producer.Return.Errors = false
  721. config.Producer.Retry.Backoff = 0
  722. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  723. if err != nil {
  724. t.Fatal(err)
  725. }
  726. for i := 0; i < 10; i++ {
  727. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  728. }
  729. wait := make(chan bool)
  730. go func() {
  731. if err := producer.Close(); err != nil {
  732. t.Error(err)
  733. }
  734. close(wait)
  735. }()
  736. prodSuccess := new(ProduceResponse)
  737. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  738. leader.Returns(prodSuccess)
  739. <-wait
  740. seedBroker.Close()
  741. leader.Close()
  742. }
  743. func TestAsyncProducerIdempotentGoldenPath(t *testing.T) {
  744. broker := NewMockBroker(t, 1)
  745. metadataResponse := &MetadataResponse{
  746. Version: 1,
  747. ControllerID: 1,
  748. }
  749. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  750. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  751. broker.Returns(metadataResponse)
  752. initProducerID := &InitProducerIDResponse{
  753. ThrottleTime: 0,
  754. ProducerID: 1000,
  755. ProducerEpoch: 1,
  756. }
  757. broker.Returns(initProducerID)
  758. config := NewConfig()
  759. config.Producer.Flush.Messages = 10
  760. config.Producer.Return.Successes = true
  761. config.Producer.Retry.Max = 4
  762. config.Producer.RequiredAcks = WaitForAll
  763. config.Producer.Retry.Backoff = 0
  764. config.Producer.Idempotent = true
  765. config.Net.MaxOpenRequests = 1
  766. config.Version = V0_11_0_0
  767. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  768. if err != nil {
  769. t.Fatal(err)
  770. }
  771. for i := 0; i < 10; i++ {
  772. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  773. }
  774. prodSuccess := &ProduceResponse{
  775. Version: 3,
  776. ThrottleTime: 0,
  777. }
  778. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  779. broker.Returns(prodSuccess)
  780. expectResults(t, producer, 10, 0)
  781. broker.Close()
  782. closeProducer(t, producer)
  783. }
  784. func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
  785. //Logger = log.New(os.Stderr, "", log.LstdFlags)
  786. tests := []struct {
  787. name string
  788. failAfterWrite bool
  789. }{
  790. {"FailAfterWrite", true},
  791. {"FailBeforeWrite", false},
  792. }
  793. for _, test := range tests {
  794. broker := NewMockBroker(t, 1)
  795. metadataResponse := &MetadataResponse{
  796. Version: 1,
  797. ControllerID: 1,
  798. }
  799. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  800. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  801. initProducerIDResponse := &InitProducerIDResponse{
  802. ThrottleTime: 0,
  803. ProducerID: 1000,
  804. ProducerEpoch: 1,
  805. }
  806. prodNotLeaderResponse := &ProduceResponse{
  807. Version: 3,
  808. ThrottleTime: 0,
  809. }
  810. prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
  811. prodDuplicate := &ProduceResponse{
  812. Version: 3,
  813. ThrottleTime: 0,
  814. }
  815. prodDuplicate.AddTopicPartition("my_topic", 0, ErrDuplicateSequenceNumber)
  816. prodOutOfSeq := &ProduceResponse{
  817. Version: 3,
  818. ThrottleTime: 0,
  819. }
  820. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  821. prodSuccessResponse := &ProduceResponse{
  822. Version: 3,
  823. ThrottleTime: 0,
  824. }
  825. prodSuccessResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  826. prodCounter := 0
  827. lastBatchFirstSeq := -1
  828. lastBatchSize := -1
  829. lastSequenceWrittenToDisk := -1
  830. handlerFailBeforeWrite := func(req *request) (res encoderWithHeader) {
  831. switch req.body.key() {
  832. case 3:
  833. return metadataResponse
  834. case 22:
  835. return initProducerIDResponse
  836. case 0:
  837. prodCounter++
  838. preq := req.body.(*ProduceRequest)
  839. batch := preq.records["my_topic"][0].RecordBatch
  840. batchFirstSeq := int(batch.FirstSequence)
  841. batchSize := len(batch.Records)
  842. if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append
  843. if lastBatchFirstSeq == batchFirstSeq { //is a batch retry
  844. if lastBatchSize == batchSize { //good retry
  845. // mock write to disk
  846. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  847. return prodSuccessResponse
  848. }
  849. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  850. return prodOutOfSeq
  851. } // not a retry
  852. // save batch just received for future check
  853. lastBatchFirstSeq = batchFirstSeq
  854. lastBatchSize = batchSize
  855. if prodCounter%2 == 1 {
  856. if test.failAfterWrite {
  857. // mock write to disk
  858. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  859. }
  860. return prodNotLeaderResponse
  861. }
  862. // mock write to disk
  863. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  864. return prodSuccessResponse
  865. }
  866. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize == batchSize { // is a good batch retry
  867. if lastSequenceWrittenToDisk == (batchFirstSeq + batchSize - 1) { // we already have the messages
  868. return prodDuplicate
  869. }
  870. // mock write to disk
  871. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  872. return prodSuccessResponse
  873. } //out of sequence / bad retried batch
  874. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize {
  875. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  876. } else if lastSequenceWrittenToDisk+1 != batchFirstSeq {
  877. t.Errorf("[%s] Out of sequence message lastSequence=%d new batch starts at=%d", test.name, lastSequenceWrittenToDisk, batchFirstSeq)
  878. } else {
  879. t.Errorf("[%s] Unexpected error", test.name)
  880. }
  881. return prodOutOfSeq
  882. }
  883. return nil
  884. }
  885. config := NewConfig()
  886. config.Version = V0_11_0_0
  887. config.Producer.Idempotent = true
  888. config.Net.MaxOpenRequests = 1
  889. config.Producer.RequiredAcks = WaitForAll
  890. config.Producer.Return.Successes = true
  891. config.Producer.Flush.Frequency = 50 * time.Millisecond
  892. config.Producer.Retry.Backoff = 100 * time.Millisecond
  893. broker.setHandler(handlerFailBeforeWrite)
  894. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  895. if err != nil {
  896. t.Fatal(err)
  897. }
  898. for i := 0; i < 3; i++ {
  899. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  900. }
  901. go func() {
  902. for i := 0; i < 7; i++ {
  903. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")}
  904. time.Sleep(100 * time.Millisecond)
  905. }
  906. }()
  907. expectResults(t, producer, 10, 0)
  908. broker.Close()
  909. closeProducer(t, producer)
  910. }
  911. }
  912. func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
  913. broker := NewMockBroker(t, 1)
  914. metadataResponse := &MetadataResponse{
  915. Version: 1,
  916. ControllerID: 1,
  917. }
  918. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  919. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  920. broker.Returns(metadataResponse)
  921. initProducerID := &InitProducerIDResponse{
  922. ThrottleTime: 0,
  923. ProducerID: 1000,
  924. ProducerEpoch: 1,
  925. }
  926. broker.Returns(initProducerID)
  927. config := NewConfig()
  928. config.Producer.Flush.Messages = 10
  929. config.Producer.Return.Successes = true
  930. config.Producer.Retry.Max = 400000
  931. config.Producer.RequiredAcks = WaitForAll
  932. config.Producer.Retry.Backoff = 0
  933. config.Producer.Idempotent = true
  934. config.Net.MaxOpenRequests = 1
  935. config.Version = V0_11_0_0
  936. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  937. if err != nil {
  938. t.Fatal(err)
  939. }
  940. for i := 0; i < 10; i++ {
  941. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  942. }
  943. prodOutOfSeq := &ProduceResponse{
  944. Version: 3,
  945. ThrottleTime: 0,
  946. }
  947. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  948. broker.Returns(prodOutOfSeq)
  949. expectResults(t, producer, 0, 10)
  950. broker.Close()
  951. closeProducer(t, producer)
  952. }
  953. func TestAsyncProducerIdempotentEpochRollover(t *testing.T) {
  954. broker := NewMockBroker(t, 1)
  955. defer broker.Close()
  956. metadataResponse := &MetadataResponse{
  957. Version: 1,
  958. ControllerID: 1,
  959. }
  960. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  961. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  962. broker.Returns(metadataResponse)
  963. initProducerID := &InitProducerIDResponse{
  964. ThrottleTime: 0,
  965. ProducerID: 1000,
  966. ProducerEpoch: 1,
  967. }
  968. broker.Returns(initProducerID)
  969. config := NewConfig()
  970. config.Producer.Flush.Messages = 10
  971. config.Producer.Flush.Frequency = 10 * time.Millisecond
  972. config.Producer.Return.Successes = true
  973. config.Producer.Retry.Max = 1 // This test needs to exercise what happens when retries exhaust
  974. config.Producer.RequiredAcks = WaitForAll
  975. config.Producer.Retry.Backoff = 0
  976. config.Producer.Idempotent = true
  977. config.Net.MaxOpenRequests = 1
  978. config.Version = V0_11_0_0
  979. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  980. if err != nil {
  981. t.Fatal(err)
  982. }
  983. defer closeProducer(t, producer)
  984. producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
  985. prodError := &ProduceResponse{
  986. Version: 3,
  987. ThrottleTime: 0,
  988. }
  989. prodError.AddTopicPartition("my_topic", 0, ErrBrokerNotAvailable)
  990. broker.Returns(prodError)
  991. <-producer.Errors()
  992. lastReqRes := broker.history[len(broker.history)-1]
  993. lastProduceBatch := lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
  994. if lastProduceBatch.FirstSequence != 0 {
  995. t.Error("first sequence not zero")
  996. }
  997. if lastProduceBatch.ProducerEpoch != 1 {
  998. t.Error("first epoch was not one")
  999. }
  1000. // Now if we produce again, the epoch should have rolled over.
  1001. producer.Input() <- &ProducerMessage{Topic: "my_topic", Value: StringEncoder("hello")}
  1002. broker.Returns(prodError)
  1003. <-producer.Errors()
  1004. lastReqRes = broker.history[len(broker.history)-1]
  1005. lastProduceBatch = lastReqRes.Request.(*ProduceRequest).records["my_topic"][0].RecordBatch
  1006. if lastProduceBatch.FirstSequence != 0 {
  1007. t.Error("second sequence not zero")
  1008. }
  1009. if lastProduceBatch.ProducerEpoch <= 1 {
  1010. t.Error("second epoch was not > 1")
  1011. }
  1012. }
  1013. // TestBrokerProducerShutdown ensures that a call to shutdown stops the
  1014. // brokerProducer run() loop and doesn't leak any goroutines
  1015. func TestBrokerProducerShutdown(t *testing.T) {
  1016. defer leaktest.Check(t)()
  1017. metrics.UseNilMetrics = true // disable Sarama's go-metrics library
  1018. defer func() {
  1019. metrics.UseNilMetrics = false
  1020. }()
  1021. mockBroker := NewMockBroker(t, 1)
  1022. metadataResponse := &MetadataResponse{}
  1023. metadataResponse.AddBroker(mockBroker.Addr(), mockBroker.BrokerID())
  1024. metadataResponse.AddTopicPartition(
  1025. "my_topic", 0, mockBroker.BrokerID(), nil, nil, nil, ErrNoError)
  1026. mockBroker.Returns(metadataResponse)
  1027. producer, err := NewAsyncProducer([]string{mockBroker.Addr()}, nil)
  1028. if err != nil {
  1029. t.Fatal(err)
  1030. }
  1031. broker := &Broker{
  1032. addr: mockBroker.Addr(),
  1033. id: mockBroker.BrokerID(),
  1034. }
  1035. bp := producer.(*asyncProducer).newBrokerProducer(broker)
  1036. bp.shutdown()
  1037. _ = producer.Close()
  1038. mockBroker.Close()
  1039. }
  1040. type appendInterceptor struct {
  1041. i int
  1042. }
  1043. func (b *appendInterceptor) OnSend(msg *ProducerMessage) {
  1044. if b.i < 0 {
  1045. panic("hey, the interceptor has failed")
  1046. }
  1047. v, _ := msg.Value.Encode()
  1048. msg.Value = StringEncoder(string(v) + strconv.Itoa(b.i))
  1049. b.i++
  1050. }
  1051. func (b *appendInterceptor) OnConsume(msg *ConsumerMessage) {
  1052. if b.i < 0 {
  1053. panic("hey, the interceptor has failed")
  1054. }
  1055. msg.Value = []byte(string(msg.Value) + strconv.Itoa(b.i))
  1056. b.i++
  1057. }
  1058. func testProducerInterceptor(
  1059. t *testing.T,
  1060. interceptors []ProducerInterceptor,
  1061. expectationFn func(*testing.T, int, *ProducerMessage),
  1062. ) {
  1063. seedBroker := NewMockBroker(t, 1)
  1064. leader := NewMockBroker(t, 2)
  1065. metadataLeader := new(MetadataResponse)
  1066. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  1067. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  1068. seedBroker.Returns(metadataLeader)
  1069. config := NewConfig()
  1070. config.Producer.Flush.Messages = 10
  1071. config.Producer.Return.Successes = true
  1072. config.Producer.Interceptors = interceptors
  1073. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  1074. if err != nil {
  1075. t.Fatal(err)
  1076. }
  1077. for i := 0; i < 10; i++ {
  1078. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  1079. }
  1080. prodSuccess := new(ProduceResponse)
  1081. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  1082. leader.Returns(prodSuccess)
  1083. for i := 0; i < 10; i++ {
  1084. select {
  1085. case msg := <-producer.Errors():
  1086. t.Error(msg.Err)
  1087. case msg := <-producer.Successes():
  1088. expectationFn(t, i, msg)
  1089. }
  1090. }
  1091. closeProducer(t, producer)
  1092. leader.Close()
  1093. seedBroker.Close()
  1094. }
  1095. func TestAsyncProducerInterceptors(t *testing.T) {
  1096. tests := []struct {
  1097. name string
  1098. interceptors []ProducerInterceptor
  1099. expectationFn func(*testing.T, int, *ProducerMessage)
  1100. }{
  1101. {
  1102. name: "intercept messages",
  1103. interceptors: []ProducerInterceptor{&appendInterceptor{i: 0}},
  1104. expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
  1105. v, _ := msg.Value.Encode()
  1106. expected := TestMessage + strconv.Itoa(i)
  1107. if string(v) != expected {
  1108. t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
  1109. }
  1110. },
  1111. },
  1112. {
  1113. name: "interceptor chain",
  1114. interceptors: []ProducerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 1000}},
  1115. expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
  1116. v, _ := msg.Value.Encode()
  1117. expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+1000)
  1118. if string(v) != expected {
  1119. t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
  1120. }
  1121. },
  1122. },
  1123. {
  1124. name: "interceptor chain with one interceptor failing",
  1125. interceptors: []ProducerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: 1000}},
  1126. expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
  1127. v, _ := msg.Value.Encode()
  1128. expected := TestMessage + strconv.Itoa(i+1000)
  1129. if string(v) != expected {
  1130. t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected)
  1131. }
  1132. },
  1133. },
  1134. {
  1135. name: "interceptor chain with all interceptors failing",
  1136. interceptors: []ProducerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: -1}},
  1137. expectationFn: func(t *testing.T, i int, msg *ProducerMessage) {
  1138. v, _ := msg.Value.Encode()
  1139. expected := TestMessage
  1140. if string(v) != expected {
  1141. t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected)
  1142. }
  1143. },
  1144. },
  1145. }
  1146. for _, tt := range tests {
  1147. t.Run(tt.name, func(t *testing.T) { testProducerInterceptor(t, tt.interceptors, tt.expectationFn) })
  1148. }
  1149. }
  1150. // This example shows how to use the producer while simultaneously
  1151. // reading the Errors channel to know about any failures.
  1152. func ExampleAsyncProducer_select() {
  1153. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  1154. if err != nil {
  1155. panic(err)
  1156. }
  1157. defer func() {
  1158. if err := producer.Close(); err != nil {
  1159. log.Fatalln(err)
  1160. }
  1161. }()
  1162. // Trap SIGINT to trigger a shutdown.
  1163. signals := make(chan os.Signal, 1)
  1164. signal.Notify(signals, os.Interrupt)
  1165. var enqueued, producerErrors int
  1166. ProducerLoop:
  1167. for {
  1168. select {
  1169. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  1170. enqueued++
  1171. case err := <-producer.Errors():
  1172. log.Println("Failed to produce message", err)
  1173. producerErrors++
  1174. case <-signals:
  1175. break ProducerLoop
  1176. }
  1177. }
  1178. log.Printf("Enqueued: %d; errors: %d\n", enqueued, producerErrors)
  1179. }
  1180. // This example shows how to use the producer with separate goroutines
  1181. // reading from the Successes and Errors channels. Note that in order
  1182. // for the Successes channel to be populated, you have to set
  1183. // config.Producer.Return.Successes to true.
  1184. func ExampleAsyncProducer_goroutines() {
  1185. config := NewConfig()
  1186. config.Producer.Return.Successes = true
  1187. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  1188. if err != nil {
  1189. panic(err)
  1190. }
  1191. // Trap SIGINT to trigger a graceful shutdown.
  1192. signals := make(chan os.Signal, 1)
  1193. signal.Notify(signals, os.Interrupt)
  1194. var (
  1195. wg sync.WaitGroup
  1196. enqueued, successes, producerErrors int
  1197. )
  1198. wg.Add(1)
  1199. go func() {
  1200. defer wg.Done()
  1201. for range producer.Successes() {
  1202. successes++
  1203. }
  1204. }()
  1205. wg.Add(1)
  1206. go func() {
  1207. defer wg.Done()
  1208. for err := range producer.Errors() {
  1209. log.Println(err)
  1210. producerErrors++
  1211. }
  1212. }()
  1213. ProducerLoop:
  1214. for {
  1215. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  1216. select {
  1217. case producer.Input() <- message:
  1218. enqueued++
  1219. case <-signals:
  1220. producer.AsyncClose() // Trigger a shutdown of the producer.
  1221. break ProducerLoop
  1222. }
  1223. }
  1224. wg.Wait()
  1225. log.Printf("Successfully produced: %d; errors: %d\n", successes, producerErrors)
  1226. }