async_producer_test.go 36 KB


  1. package sarama
  2. import (
  3. "errors"
  4. "log"
  5. "os"
  6. "os/signal"
  7. "sync"
  8. "sync/atomic"
  9. "testing"
  10. "time"
  11. )
  12. const TestMessage = "ABC THE MESSAGE"
  13. func closeProducer(t *testing.T, p AsyncProducer) {
  14. var wg sync.WaitGroup
  15. p.AsyncClose()
  16. wg.Add(2)
  17. go func() {
  18. for range p.Successes() {
  19. t.Error("Unexpected message on Successes()")
  20. }
  21. wg.Done()
  22. }()
  23. go func() {
  24. for msg := range p.Errors() {
  25. t.Error(msg.Err)
  26. }
  27. wg.Done()
  28. }()
  29. wg.Wait()
  30. }
  31. func expectResults(t *testing.T, p AsyncProducer, successes, errors int) {
  32. expect := successes + errors
  33. for expect > 0 {
  34. select {
  35. case msg := <-p.Errors():
  36. if msg.Msg.flags != 0 {
  37. t.Error("Message had flags set")
  38. }
  39. errors--
  40. expect--
  41. if errors < 0 {
  42. t.Error(msg.Err)
  43. }
  44. case msg := <-p.Successes():
  45. if msg.flags != 0 {
  46. t.Error("Message had flags set")
  47. }
  48. successes--
  49. expect--
  50. if successes < 0 {
  51. t.Error("Too many successes")
  52. }
  53. }
  54. }
  55. if successes != 0 || errors != 0 {
  56. t.Error("Unexpected successes", successes, "or errors", errors)
  57. }
  58. }
  59. type testPartitioner chan *int32
  60. func (p testPartitioner) Partition(msg *ProducerMessage, numPartitions int32) (int32, error) {
  61. part := <-p
  62. if part == nil {
  63. return 0, errors.New("BOOM")
  64. }
  65. return *part, nil
  66. }
  67. func (p testPartitioner) RequiresConsistency() bool {
  68. return true
  69. }
  70. func (p testPartitioner) feed(partition int32) {
  71. p <- &partition
  72. }
  73. type flakyEncoder bool
  74. func (f flakyEncoder) Length() int {
  75. return len(TestMessage)
  76. }
  77. func (f flakyEncoder) Encode() ([]byte, error) {
  78. if !bool(f) {
  79. return nil, errors.New("flaky encoding error")
  80. }
  81. return []byte(TestMessage), nil
  82. }
  83. func TestAsyncProducer(t *testing.T) {
  84. seedBroker := NewMockBroker(t, 1)
  85. leader := NewMockBroker(t, 2)
  86. metadataResponse := new(MetadataResponse)
  87. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  88. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  89. seedBroker.Returns(metadataResponse)
  90. prodSuccess := new(ProduceResponse)
  91. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  92. leader.Returns(prodSuccess)
  93. config := NewConfig()
  94. config.Producer.Flush.Messages = 10
  95. config.Producer.Return.Successes = true
  96. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  97. if err != nil {
  98. t.Fatal(err)
  99. }
  100. for i := 0; i < 10; i++ {
  101. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  102. }
  103. for i := 0; i < 10; i++ {
  104. select {
  105. case msg := <-producer.Errors():
  106. t.Error(msg.Err)
  107. if msg.Msg.flags != 0 {
  108. t.Error("Message had flags set")
  109. }
  110. case msg := <-producer.Successes():
  111. if msg.flags != 0 {
  112. t.Error("Message had flags set")
  113. }
  114. if msg.Metadata.(int) != i {
  115. t.Error("Message metadata did not match")
  116. }
  117. case <-time.After(time.Second):
  118. t.Errorf("Timeout waiting for msg #%d", i)
  119. goto done
  120. }
  121. }
  122. done:
  123. closeProducer(t, producer)
  124. leader.Close()
  125. seedBroker.Close()
  126. }
  127. func TestAsyncProducerMultipleFlushes(t *testing.T) {
  128. seedBroker := NewMockBroker(t, 1)
  129. leader := NewMockBroker(t, 2)
  130. metadataResponse := new(MetadataResponse)
  131. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  132. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  133. seedBroker.Returns(metadataResponse)
  134. prodSuccess := new(ProduceResponse)
  135. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  136. leader.Returns(prodSuccess)
  137. leader.Returns(prodSuccess)
  138. leader.Returns(prodSuccess)
  139. config := NewConfig()
  140. config.Producer.Flush.Messages = 5
  141. config.Producer.Return.Successes = true
  142. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  143. if err != nil {
  144. t.Fatal(err)
  145. }
  146. for flush := 0; flush < 3; flush++ {
  147. for i := 0; i < 5; i++ {
  148. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  149. }
  150. expectResults(t, producer, 5, 0)
  151. }
  152. closeProducer(t, producer)
  153. leader.Close()
  154. seedBroker.Close()
  155. }
  156. func TestAsyncProducerMultipleBrokers(t *testing.T) {
  157. seedBroker := NewMockBroker(t, 1)
  158. leader0 := NewMockBroker(t, 2)
  159. leader1 := NewMockBroker(t, 3)
  160. metadataResponse := new(MetadataResponse)
  161. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  162. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  163. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, nil, ErrNoError)
  164. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  165. seedBroker.Returns(metadataResponse)
  166. prodResponse0 := new(ProduceResponse)
  167. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  168. leader0.Returns(prodResponse0)
  169. prodResponse1 := new(ProduceResponse)
  170. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  171. leader1.Returns(prodResponse1)
  172. config := NewConfig()
  173. config.Producer.Flush.Messages = 5
  174. config.Producer.Return.Successes = true
  175. config.Producer.Partitioner = NewRoundRobinPartitioner
  176. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  177. if err != nil {
  178. t.Fatal(err)
  179. }
  180. for i := 0; i < 10; i++ {
  181. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  182. }
  183. expectResults(t, producer, 10, 0)
  184. closeProducer(t, producer)
  185. leader1.Close()
  186. leader0.Close()
  187. seedBroker.Close()
  188. }
  189. func TestAsyncProducerCustomPartitioner(t *testing.T) {
  190. seedBroker := NewMockBroker(t, 1)
  191. leader := NewMockBroker(t, 2)
  192. metadataResponse := new(MetadataResponse)
  193. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  194. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  195. seedBroker.Returns(metadataResponse)
  196. prodResponse := new(ProduceResponse)
  197. prodResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  198. leader.Returns(prodResponse)
  199. config := NewConfig()
  200. config.Producer.Flush.Messages = 2
  201. config.Producer.Return.Successes = true
  202. config.Producer.Partitioner = func(topic string) Partitioner {
  203. p := make(testPartitioner)
  204. go func() {
  205. p.feed(0)
  206. p <- nil
  207. p <- nil
  208. p <- nil
  209. p.feed(0)
  210. }()
  211. return p
  212. }
  213. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  214. if err != nil {
  215. t.Fatal(err)
  216. }
  217. for i := 0; i < 5; i++ {
  218. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  219. }
  220. expectResults(t, producer, 2, 3)
  221. closeProducer(t, producer)
  222. leader.Close()
  223. seedBroker.Close()
  224. }
  225. func TestAsyncProducerFailureRetry(t *testing.T) {
  226. seedBroker := NewMockBroker(t, 1)
  227. leader1 := NewMockBroker(t, 2)
  228. leader2 := NewMockBroker(t, 3)
  229. metadataLeader1 := new(MetadataResponse)
  230. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  231. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  232. seedBroker.Returns(metadataLeader1)
  233. config := NewConfig()
  234. config.Producer.Flush.Messages = 10
  235. config.Producer.Return.Successes = true
  236. config.Producer.Retry.Backoff = 0
  237. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  238. if err != nil {
  239. t.Fatal(err)
  240. }
  241. seedBroker.Close()
  242. for i := 0; i < 10; i++ {
  243. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  244. }
  245. prodNotLeader := new(ProduceResponse)
  246. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  247. leader1.Returns(prodNotLeader)
  248. metadataLeader2 := new(MetadataResponse)
  249. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  250. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  251. leader1.Returns(metadataLeader2)
  252. prodSuccess := new(ProduceResponse)
  253. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  254. leader2.Returns(prodSuccess)
  255. expectResults(t, producer, 10, 0)
  256. leader1.Close()
  257. for i := 0; i < 10; i++ {
  258. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  259. }
  260. leader2.Returns(prodSuccess)
  261. expectResults(t, producer, 10, 0)
  262. leader2.Close()
  263. closeProducer(t, producer)
  264. }
  265. func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
  266. tt := func(t *testing.T, kErr KError) {
  267. seedBroker := NewMockBroker(t, 1)
  268. leader1 := NewMockBroker(t, 2)
  269. leader2 := NewMockBroker(t, 3)
  270. metadataLeader1 := new(MetadataResponse)
  271. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  272. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  273. metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  274. seedBroker.Returns(metadataLeader1)
  275. config := NewConfig()
  276. config.Producer.Flush.Messages = 2
  277. config.Producer.Return.Successes = true
  278. config.Producer.Retry.Max = 0 // disable!
  279. config.Producer.Retry.Backoff = 0
  280. config.Producer.Partitioner = NewManualPartitioner
  281. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  282. if err != nil {
  283. t.Fatal(err)
  284. }
  285. seedBroker.Close()
  286. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  287. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  288. prodNotLeader := new(ProduceResponse)
  289. prodNotLeader.AddTopicPartition("my_topic", 0, kErr)
  290. prodNotLeader.AddTopicPartition("my_topic", 1, kErr)
  291. leader1.Returns(prodNotLeader)
  292. expectResults(t, producer, 0, 2)
  293. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  294. metadataLeader2 := new(MetadataResponse)
  295. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  296. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  297. metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  298. leader1.Returns(metadataLeader2)
  299. leader1.Returns(metadataLeader2)
  300. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 1}
  301. prodSuccess := new(ProduceResponse)
  302. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  303. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  304. leader2.Returns(prodSuccess)
  305. expectResults(t, producer, 2, 0)
  306. leader1.Close()
  307. leader2.Close()
  308. closeProducer(t, producer)
  309. }
  310. t.Run("retriable error", func(t *testing.T) {
  311. tt(t, ErrNotLeaderForPartition)
  312. })
  313. t.Run("non-retriable error", func(t *testing.T) {
  314. tt(t, ErrNotController)
  315. })
  316. }
  317. func TestAsyncProducerEncoderFailures(t *testing.T) {
  318. seedBroker := NewMockBroker(t, 1)
  319. leader := NewMockBroker(t, 2)
  320. metadataResponse := new(MetadataResponse)
  321. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  322. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  323. seedBroker.Returns(metadataResponse)
  324. prodSuccess := new(ProduceResponse)
  325. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  326. leader.Returns(prodSuccess)
  327. leader.Returns(prodSuccess)
  328. leader.Returns(prodSuccess)
  329. config := NewConfig()
  330. config.Producer.Flush.Messages = 1
  331. config.Producer.Return.Successes = true
  332. config.Producer.Partitioner = NewManualPartitioner
  333. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  334. if err != nil {
  335. t.Fatal(err)
  336. }
  337. for flush := 0; flush < 3; flush++ {
  338. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
  339. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
  340. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
  341. expectResults(t, producer, 1, 2)
  342. }
  343. closeProducer(t, producer)
  344. leader.Close()
  345. seedBroker.Close()
  346. }
  347. // If a Kafka broker becomes unavailable and then returns back in service, then
  348. // producer reconnects to it and continues sending messages.
  349. func TestAsyncProducerBrokerBounce(t *testing.T) {
  350. // Given
  351. seedBroker := NewMockBroker(t, 1)
  352. leader := NewMockBroker(t, 2)
  353. leaderAddr := leader.Addr()
  354. metadataResponse := new(MetadataResponse)
  355. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  356. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  357. seedBroker.Returns(metadataResponse)
  358. prodSuccess := new(ProduceResponse)
  359. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  360. config := NewConfig()
  361. config.Producer.Flush.Messages = 1
  362. config.Producer.Return.Successes = true
  363. config.Producer.Retry.Backoff = 0
  364. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  365. if err != nil {
  366. t.Fatal(err)
  367. }
  368. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  369. leader.Returns(prodSuccess)
  370. expectResults(t, producer, 1, 0)
  371. // When: a broker connection gets reset by a broker (network glitch, restart, you name it).
  372. leader.Close() // producer should get EOF
  373. leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  374. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  375. // Then: a produced message goes through the new broker connection.
  376. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  377. leader.Returns(prodSuccess)
  378. expectResults(t, producer, 1, 0)
  379. closeProducer(t, producer)
  380. seedBroker.Close()
  381. leader.Close()
  382. }
  383. func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  384. seedBroker := NewMockBroker(t, 1)
  385. leader1 := NewMockBroker(t, 2)
  386. leader2 := NewMockBroker(t, 3)
  387. metadataLeader1 := new(MetadataResponse)
  388. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  389. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  390. seedBroker.Returns(metadataLeader1)
  391. config := NewConfig()
  392. config.Producer.Flush.Messages = 10
  393. config.Producer.Return.Successes = true
  394. config.Producer.Retry.Max = 3
  395. config.Producer.Retry.Backoff = 0
  396. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  397. if err != nil {
  398. t.Fatal(err)
  399. }
  400. for i := 0; i < 10; i++ {
  401. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  402. }
  403. leader1.Close() // producer should get EOF
  404. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  405. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  406. // ok fine, tell it to go to leader2 finally
  407. metadataLeader2 := new(MetadataResponse)
  408. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  409. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  410. seedBroker.Returns(metadataLeader2)
  411. prodSuccess := new(ProduceResponse)
  412. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  413. leader2.Returns(prodSuccess)
  414. expectResults(t, producer, 10, 0)
  415. seedBroker.Close()
  416. leader2.Close()
  417. closeProducer(t, producer)
  418. }
  419. func TestAsyncProducerMultipleRetries(t *testing.T) {
  420. seedBroker := NewMockBroker(t, 1)
  421. leader1 := NewMockBroker(t, 2)
  422. leader2 := NewMockBroker(t, 3)
  423. metadataLeader1 := new(MetadataResponse)
  424. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  425. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  426. seedBroker.Returns(metadataLeader1)
  427. config := NewConfig()
  428. config.Producer.Flush.Messages = 10
  429. config.Producer.Return.Successes = true
  430. config.Producer.Retry.Max = 4
  431. config.Producer.Retry.Backoff = 0
  432. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  433. if err != nil {
  434. t.Fatal(err)
  435. }
  436. for i := 0; i < 10; i++ {
  437. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  438. }
  439. prodNotLeader := new(ProduceResponse)
  440. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  441. leader1.Returns(prodNotLeader)
  442. metadataLeader2 := new(MetadataResponse)
  443. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  444. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  445. seedBroker.Returns(metadataLeader2)
  446. leader2.Returns(prodNotLeader)
  447. seedBroker.Returns(metadataLeader1)
  448. leader1.Returns(prodNotLeader)
  449. seedBroker.Returns(metadataLeader1)
  450. leader1.Returns(prodNotLeader)
  451. seedBroker.Returns(metadataLeader2)
  452. prodSuccess := new(ProduceResponse)
  453. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  454. leader2.Returns(prodSuccess)
  455. expectResults(t, producer, 10, 0)
  456. for i := 0; i < 10; i++ {
  457. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  458. }
  459. leader2.Returns(prodSuccess)
  460. expectResults(t, producer, 10, 0)
  461. seedBroker.Close()
  462. leader1.Close()
  463. leader2.Close()
  464. closeProducer(t, producer)
  465. }
  466. func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
  467. seedBroker := NewMockBroker(t, 1)
  468. leader1 := NewMockBroker(t, 2)
  469. leader2 := NewMockBroker(t, 3)
  470. metadataLeader1 := new(MetadataResponse)
  471. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  472. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
  473. seedBroker.Returns(metadataLeader1)
  474. config := NewConfig()
  475. config.Producer.Flush.Messages = 1
  476. config.Producer.Return.Successes = true
  477. config.Producer.Retry.Max = 4
  478. backoffCalled := make([]int32, config.Producer.Retry.Max+1)
  479. config.Producer.Retry.BackoffFunc = func(retries, maxRetries int) time.Duration {
  480. atomic.AddInt32(&backoffCalled[retries-1], 1)
  481. return 0
  482. }
  483. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  484. if err != nil {
  485. t.Fatal(err)
  486. }
  487. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  488. prodNotLeader := new(ProduceResponse)
  489. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  490. prodSuccess := new(ProduceResponse)
  491. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  492. metadataLeader2 := new(MetadataResponse)
  493. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  494. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
  495. leader1.Returns(prodNotLeader)
  496. seedBroker.Returns(metadataLeader2)
  497. leader2.Returns(prodNotLeader)
  498. seedBroker.Returns(metadataLeader1)
  499. leader1.Returns(prodNotLeader)
  500. seedBroker.Returns(metadataLeader1)
  501. leader1.Returns(prodNotLeader)
  502. seedBroker.Returns(metadataLeader2)
  503. leader2.Returns(prodSuccess)
  504. expectResults(t, producer, 1, 0)
  505. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  506. leader2.Returns(prodSuccess)
  507. expectResults(t, producer, 1, 0)
  508. seedBroker.Close()
  509. leader1.Close()
  510. leader2.Close()
  511. closeProducer(t, producer)
  512. for i := 0; i < config.Producer.Retry.Max; i++ {
  513. if atomic.LoadInt32(&backoffCalled[i]) != 1 {
  514. t.Errorf("expected one retry attempt #%d", i)
  515. }
  516. }
  517. if atomic.LoadInt32(&backoffCalled[config.Producer.Retry.Max]) != 0 {
  518. t.Errorf("expected no retry attempt #%d", config.Producer.Retry.Max)
  519. }
  520. }
  521. func TestAsyncProducerOutOfRetries(t *testing.T) {
  522. t.Skip("Enable once bug #294 is fixed.")
  523. seedBroker := NewMockBroker(t, 1)
  524. leader := NewMockBroker(t, 2)
  525. metadataResponse := new(MetadataResponse)
  526. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  527. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  528. seedBroker.Returns(metadataResponse)
  529. config := NewConfig()
  530. config.Producer.Flush.Messages = 10
  531. config.Producer.Return.Successes = true
  532. config.Producer.Retry.Backoff = 0
  533. config.Producer.Retry.Max = 0
  534. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  535. if err != nil {
  536. t.Fatal(err)
  537. }
  538. for i := 0; i < 10; i++ {
  539. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  540. }
  541. prodNotLeader := new(ProduceResponse)
  542. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  543. leader.Returns(prodNotLeader)
  544. for i := 0; i < 10; i++ {
  545. select {
  546. case msg := <-producer.Errors():
  547. if msg.Err != ErrNotLeaderForPartition {
  548. t.Error(msg.Err)
  549. }
  550. case <-producer.Successes():
  551. t.Error("Unexpected success")
  552. }
  553. }
  554. seedBroker.Returns(metadataResponse)
  555. for i := 0; i < 10; i++ {
  556. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  557. }
  558. prodSuccess := new(ProduceResponse)
  559. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  560. leader.Returns(prodSuccess)
  561. expectResults(t, producer, 10, 0)
  562. leader.Close()
  563. seedBroker.Close()
  564. safeClose(t, producer)
  565. }
  566. func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
  567. seedBroker := NewMockBroker(t, 1)
  568. leader := NewMockBroker(t, 2)
  569. leaderAddr := leader.Addr()
  570. metadataResponse := new(MetadataResponse)
  571. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  572. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  573. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError)
  574. seedBroker.Returns(metadataResponse)
  575. config := NewConfig()
  576. config.Producer.Return.Successes = true
  577. config.Producer.Retry.Backoff = 0
  578. config.Producer.Retry.Max = 1
  579. config.Producer.Partitioner = NewRoundRobinPartitioner
  580. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  581. if err != nil {
  582. t.Fatal(err)
  583. }
  584. // prime partition 0
  585. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  586. prodSuccess := new(ProduceResponse)
  587. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  588. leader.Returns(prodSuccess)
  589. expectResults(t, producer, 1, 0)
  590. // prime partition 1
  591. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  592. prodSuccess = new(ProduceResponse)
  593. prodSuccess.AddTopicPartition("my_topic", 1, ErrNoError)
  594. leader.Returns(prodSuccess)
  595. expectResults(t, producer, 1, 0)
  596. // reboot the broker (the producer will get EOF on its existing connection)
  597. leader.Close()
  598. leader = NewMockBrokerAddr(t, 2, leaderAddr)
  599. // send another message on partition 0 to trigger the EOF and retry
  600. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  601. // tell partition 0 to go to that broker again
  602. seedBroker.Returns(metadataResponse)
  603. // succeed this time
  604. prodSuccess = new(ProduceResponse)
  605. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  606. leader.Returns(prodSuccess)
  607. expectResults(t, producer, 1, 0)
  608. // shutdown
  609. closeProducer(t, producer)
  610. seedBroker.Close()
  611. leader.Close()
  612. }
  613. func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
  614. seedBroker := NewMockBroker(t, 1)
  615. leader := NewMockBroker(t, 2)
  616. metadataResponse := new(MetadataResponse)
  617. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  618. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  619. metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError)
  620. seedBroker.Returns(metadataResponse)
  621. config := NewConfig()
  622. config.Producer.Flush.Messages = 5
  623. config.Producer.Return.Successes = true
  624. config.Producer.Retry.Backoff = 0
  625. config.Producer.Retry.Max = 1
  626. config.Producer.Partitioner = NewManualPartitioner
  627. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  628. if err != nil {
  629. t.Fatal(err)
  630. }
  631. // prime partitions
  632. for p := int32(0); p < 2; p++ {
  633. for i := 0; i < 5; i++ {
  634. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: p}
  635. }
  636. prodSuccess := new(ProduceResponse)
  637. prodSuccess.AddTopicPartition("my_topic", p, ErrNoError)
  638. leader.Returns(prodSuccess)
  639. expectResults(t, producer, 5, 0)
  640. }
  641. // send more messages on partition 0
  642. for i := 0; i < 5; i++ {
  643. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  644. }
  645. prodNotLeader := new(ProduceResponse)
  646. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  647. leader.Returns(prodNotLeader)
  648. time.Sleep(50 * time.Millisecond)
  649. leader.SetHandlerByMap(map[string]MockResponse{
  650. "ProduceRequest": NewMockProduceResponse(t).
  651. SetVersion(0).
  652. SetError("my_topic", 0, ErrNoError),
  653. })
  654. // tell partition 0 to go to that broker again
  655. seedBroker.Returns(metadataResponse)
  656. // succeed this time
  657. expectResults(t, producer, 5, 0)
  658. // put five more through
  659. for i := 0; i < 5; i++ {
  660. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
  661. }
  662. expectResults(t, producer, 5, 0)
  663. // shutdown
  664. closeProducer(t, producer)
  665. seedBroker.Close()
  666. leader.Close()
  667. }
  668. func TestAsyncProducerRetryShutdown(t *testing.T) {
  669. seedBroker := NewMockBroker(t, 1)
  670. leader := NewMockBroker(t, 2)
  671. metadataLeader := new(MetadataResponse)
  672. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  673. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  674. seedBroker.Returns(metadataLeader)
  675. config := NewConfig()
  676. config.Producer.Flush.Messages = 10
  677. config.Producer.Return.Successes = true
  678. config.Producer.Retry.Backoff = 0
  679. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  680. if err != nil {
  681. t.Fatal(err)
  682. }
  683. for i := 0; i < 10; i++ {
  684. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  685. }
  686. producer.AsyncClose()
  687. time.Sleep(5 * time.Millisecond) // let the shutdown goroutine kick in
  688. producer.Input() <- &ProducerMessage{Topic: "FOO"}
  689. if err := <-producer.Errors(); err.Err != ErrShuttingDown {
  690. t.Error(err)
  691. }
  692. prodNotLeader := new(ProduceResponse)
  693. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  694. leader.Returns(prodNotLeader)
  695. seedBroker.Returns(metadataLeader)
  696. prodSuccess := new(ProduceResponse)
  697. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  698. leader.Returns(prodSuccess)
  699. expectResults(t, producer, 10, 0)
  700. seedBroker.Close()
  701. leader.Close()
  702. // wait for the async-closed producer to shut down fully
  703. for err := range producer.Errors() {
  704. t.Error(err)
  705. }
  706. }
  707. func TestAsyncProducerNoReturns(t *testing.T) {
  708. seedBroker := NewMockBroker(t, 1)
  709. leader := NewMockBroker(t, 2)
  710. metadataLeader := new(MetadataResponse)
  711. metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
  712. metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
  713. seedBroker.Returns(metadataLeader)
  714. config := NewConfig()
  715. config.Producer.Flush.Messages = 10
  716. config.Producer.Return.Successes = false
  717. config.Producer.Return.Errors = false
  718. config.Producer.Retry.Backoff = 0
  719. producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
  720. if err != nil {
  721. t.Fatal(err)
  722. }
  723. for i := 0; i < 10; i++ {
  724. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  725. }
  726. wait := make(chan bool)
  727. go func() {
  728. if err := producer.Close(); err != nil {
  729. t.Error(err)
  730. }
  731. close(wait)
  732. }()
  733. prodSuccess := new(ProduceResponse)
  734. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  735. leader.Returns(prodSuccess)
  736. <-wait
  737. seedBroker.Close()
  738. leader.Close()
  739. }
  740. func TestAsyncProducerIdempotentGoldenPath(t *testing.T) {
  741. broker := NewMockBroker(t, 1)
  742. metadataResponse := &MetadataResponse{
  743. Version: 1,
  744. ControllerID: 1,
  745. }
  746. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  747. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  748. broker.Returns(metadataResponse)
  749. initProducerID := &InitProducerIDResponse{
  750. ThrottleTime: 0,
  751. ProducerID: 1000,
  752. ProducerEpoch: 1,
  753. }
  754. broker.Returns(initProducerID)
  755. config := NewConfig()
  756. config.Producer.Flush.Messages = 10
  757. config.Producer.Return.Successes = true
  758. config.Producer.Retry.Max = 4
  759. config.Producer.RequiredAcks = WaitForAll
  760. config.Producer.Retry.Backoff = 0
  761. config.Producer.Idempotent = true
  762. config.Net.MaxOpenRequests = 1
  763. config.Version = V0_11_0_0
  764. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  765. if err != nil {
  766. t.Fatal(err)
  767. }
  768. for i := 0; i < 10; i++ {
  769. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  770. }
  771. prodSuccess := &ProduceResponse{
  772. Version: 3,
  773. ThrottleTime: 0,
  774. }
  775. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  776. broker.Returns(prodSuccess)
  777. expectResults(t, producer, 10, 0)
  778. broker.Close()
  779. closeProducer(t, producer)
  780. }
  781. func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
  782. //Logger = log.New(os.Stderr, "", log.LstdFlags)
  783. tests := []struct {
  784. name string
  785. failAfterWrite bool
  786. }{
  787. {"FailAfterWrite", true},
  788. {"FailBeforeWrite", false},
  789. }
  790. for _, test := range tests {
  791. broker := NewMockBroker(t, 1)
  792. metadataResponse := &MetadataResponse{
  793. Version: 1,
  794. ControllerID: 1,
  795. }
  796. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  797. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  798. initProducerIDResponse := &InitProducerIDResponse{
  799. ThrottleTime: 0,
  800. ProducerID: 1000,
  801. ProducerEpoch: 1,
  802. }
  803. prodNotLeaderResponse := &ProduceResponse{
  804. Version: 3,
  805. ThrottleTime: 0,
  806. }
  807. prodNotLeaderResponse.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
  808. prodDuplicate := &ProduceResponse{
  809. Version: 3,
  810. ThrottleTime: 0,
  811. }
  812. prodDuplicate.AddTopicPartition("my_topic", 0, ErrDuplicateSequenceNumber)
  813. prodOutOfSeq := &ProduceResponse{
  814. Version: 3,
  815. ThrottleTime: 0,
  816. }
  817. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  818. prodSuccessResponse := &ProduceResponse{
  819. Version: 3,
  820. ThrottleTime: 0,
  821. }
  822. prodSuccessResponse.AddTopicPartition("my_topic", 0, ErrNoError)
  823. prodCounter := 0
  824. lastBatchFirstSeq := -1
  825. lastBatchSize := -1
  826. lastSequenceWrittenToDisk := -1
  827. handlerFailBeforeWrite := func(req *request) (res encoder) {
  828. switch req.body.key() {
  829. case 3:
  830. return metadataResponse
  831. case 22:
  832. return initProducerIDResponse
  833. case 0:
  834. prodCounter++
  835. preq := req.body.(*ProduceRequest)
  836. batch := preq.records["my_topic"][0].RecordBatch
  837. batchFirstSeq := int(batch.FirstSequence)
  838. batchSize := len(batch.Records)
  839. if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append
  840. if lastBatchFirstSeq == batchFirstSeq { //is a batch retry
  841. if lastBatchSize == batchSize { //good retry
  842. // mock write to disk
  843. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  844. return prodSuccessResponse
  845. }
  846. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  847. return prodOutOfSeq
  848. } // not a retry
  849. // save batch just received for future check
  850. lastBatchFirstSeq = batchFirstSeq
  851. lastBatchSize = batchSize
  852. if prodCounter%2 == 1 {
  853. if test.failAfterWrite {
  854. // mock write to disk
  855. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  856. }
  857. return prodNotLeaderResponse
  858. }
  859. // mock write to disk
  860. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  861. return prodSuccessResponse
  862. }
  863. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize == batchSize { // is a good batch retry
  864. if lastSequenceWrittenToDisk == (batchFirstSeq + batchSize - 1) { // we already have the messages
  865. return prodDuplicate
  866. }
  867. // mock write to disk
  868. lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
  869. return prodSuccessResponse
  870. } //out of sequence / bad retried batch
  871. if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize {
  872. t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
  873. } else if lastSequenceWrittenToDisk+1 != batchFirstSeq {
  874. t.Errorf("[%s] Out of sequence message lastSequence=%d new batch starts at=%d", test.name, lastSequenceWrittenToDisk, batchFirstSeq)
  875. } else {
  876. t.Errorf("[%s] Unexpected error", test.name)
  877. }
  878. return prodOutOfSeq
  879. }
  880. return nil
  881. }
  882. config := NewConfig()
  883. config.Version = V0_11_0_0
  884. config.Producer.Idempotent = true
  885. config.Net.MaxOpenRequests = 1
  886. config.Producer.RequiredAcks = WaitForAll
  887. config.Producer.Return.Successes = true
  888. config.Producer.Flush.Frequency = 50 * time.Millisecond
  889. config.Producer.Retry.Backoff = 100 * time.Millisecond
  890. broker.setHandler(handlerFailBeforeWrite)
  891. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  892. if err != nil {
  893. t.Fatal(err)
  894. }
  895. for i := 0; i < 3; i++ {
  896. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  897. }
  898. go func() {
  899. for i := 0; i < 7; i++ {
  900. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine")}
  901. time.Sleep(100 * time.Millisecond)
  902. }
  903. }()
  904. expectResults(t, producer, 10, 0)
  905. broker.Close()
  906. closeProducer(t, producer)
  907. }
  908. }
  909. func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
  910. broker := NewMockBroker(t, 1)
  911. metadataResponse := &MetadataResponse{
  912. Version: 1,
  913. ControllerID: 1,
  914. }
  915. metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
  916. metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
  917. broker.Returns(metadataResponse)
  918. initProducerID := &InitProducerIDResponse{
  919. ThrottleTime: 0,
  920. ProducerID: 1000,
  921. ProducerEpoch: 1,
  922. }
  923. broker.Returns(initProducerID)
  924. config := NewConfig()
  925. config.Producer.Flush.Messages = 10
  926. config.Producer.Return.Successes = true
  927. config.Producer.Retry.Max = 400000
  928. config.Producer.RequiredAcks = WaitForAll
  929. config.Producer.Retry.Backoff = 0
  930. config.Producer.Idempotent = true
  931. config.Net.MaxOpenRequests = 1
  932. config.Version = V0_11_0_0
  933. producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
  934. if err != nil {
  935. t.Fatal(err)
  936. }
  937. for i := 0; i < 10; i++ {
  938. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  939. }
  940. prodOutOfSeq := &ProduceResponse{
  941. Version: 3,
  942. ThrottleTime: 0,
  943. }
  944. prodOutOfSeq.AddTopicPartition("my_topic", 0, ErrOutOfOrderSequenceNumber)
  945. broker.Returns(prodOutOfSeq)
  946. expectResults(t, producer, 0, 10)
  947. broker.Close()
  948. closeProducer(t, producer)
  949. }
  950. // This example shows how to use the producer while simultaneously
  951. // reading the Errors channel to know about any failures.
  952. func ExampleAsyncProducer_select() {
  953. producer, err := NewAsyncProducer([]string{"localhost:9092"}, nil)
  954. if err != nil {
  955. panic(err)
  956. }
  957. defer func() {
  958. if err := producer.Close(); err != nil {
  959. log.Fatalln(err)
  960. }
  961. }()
  962. // Trap SIGINT to trigger a shutdown.
  963. signals := make(chan os.Signal, 1)
  964. signal.Notify(signals, os.Interrupt)
  965. var enqueued, errors int
  966. ProducerLoop:
  967. for {
  968. select {
  969. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  970. enqueued++
  971. case err := <-producer.Errors():
  972. log.Println("Failed to produce message", err)
  973. errors++
  974. case <-signals:
  975. break ProducerLoop
  976. }
  977. }
  978. log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
  979. }
  980. // This example shows how to use the producer with separate goroutines
  981. // reading from the Successes and Errors channels. Note that in order
  982. // for the Successes channel to be populated, you have to set
  983. // config.Producer.Return.Successes to true.
  984. func ExampleAsyncProducer_goroutines() {
  985. config := NewConfig()
  986. config.Producer.Return.Successes = true
  987. producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
  988. if err != nil {
  989. panic(err)
  990. }
  991. // Trap SIGINT to trigger a graceful shutdown.
  992. signals := make(chan os.Signal, 1)
  993. signal.Notify(signals, os.Interrupt)
  994. var (
  995. wg sync.WaitGroup
  996. enqueued, successes, errors int
  997. )
  998. wg.Add(1)
  999. go func() {
  1000. defer wg.Done()
  1001. for range producer.Successes() {
  1002. successes++
  1003. }
  1004. }()
  1005. wg.Add(1)
  1006. go func() {
  1007. defer wg.Done()
  1008. for err := range producer.Errors() {
  1009. log.Println(err)
  1010. errors++
  1011. }
  1012. }()
  1013. ProducerLoop:
  1014. for {
  1015. message := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
  1016. select {
  1017. case producer.Input() <- message:
  1018. enqueued++
  1019. case <-signals:
  1020. producer.AsyncClose() // Trigger a shutdown of the producer.
  1021. break ProducerLoop
  1022. }
  1023. }
  1024. wg.Wait()
  1025. log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
  1026. }