producer_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. )
  7. const TestMessage = "ABC THE MESSAGE"
  8. func closeProducer(t *testing.T, p *Producer) {
  9. var wg sync.WaitGroup
  10. p.AsyncClose()
  11. wg.Add(2)
  12. go func() {
  13. for _ = range p.Successes() {
  14. t.Error("Unexpected message on Successes()")
  15. }
  16. wg.Done()
  17. }()
  18. go func() {
  19. for msg := range p.Errors() {
  20. t.Error(msg.Err)
  21. }
  22. wg.Done()
  23. }()
  24. wg.Wait()
  25. }
  26. func TestDefaultProducerConfigValidates(t *testing.T) {
  27. config := NewProducerConfig()
  28. if err := config.Validate(); err != nil {
  29. t.Error(err)
  30. }
  31. }
  32. func TestSimpleProducer(t *testing.T) {
  33. seedBroker := newMockBroker(t, 1)
  34. leader := newMockBroker(t, 2)
  35. metadataResponse := new(MetadataResponse)
  36. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  37. metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, ErrNoError)
  38. seedBroker.Returns(metadataResponse)
  39. prodSuccess := new(ProduceResponse)
  40. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  41. for i := 0; i < 10; i++ {
  42. leader.Returns(prodSuccess)
  43. }
  44. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  45. if err != nil {
  46. t.Fatal(err)
  47. }
  48. producer, err := NewSimpleProducer(client, nil)
  49. if err != nil {
  50. t.Fatal(err)
  51. }
  52. for i := 0; i < 10; i++ {
  53. err = producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
  54. if err != nil {
  55. t.Error(err)
  56. }
  57. }
  58. safeClose(t, producer)
  59. safeClose(t, client)
  60. leader.Close()
  61. seedBroker.Close()
  62. }
  63. func TestConcurrentSimpleProducer(t *testing.T) {
  64. seedBroker := newMockBroker(t, 1)
  65. leader := newMockBroker(t, 2)
  66. metadataResponse := new(MetadataResponse)
  67. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  68. metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, ErrNoError)
  69. seedBroker.Returns(metadataResponse)
  70. prodSuccess := new(ProduceResponse)
  71. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  72. leader.Returns(prodSuccess)
  73. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  74. if err != nil {
  75. t.Fatal(err)
  76. }
  77. config := NewProducerConfig()
  78. config.FlushMsgCount = 100
  79. producer, err := NewSimpleProducer(client, config)
  80. if err != nil {
  81. t.Fatal(err)
  82. }
  83. wg := sync.WaitGroup{}
  84. for i := 0; i < 100; i++ {
  85. wg.Add(1)
  86. go func() {
  87. err := producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
  88. if err != nil {
  89. t.Error(err)
  90. }
  91. wg.Done()
  92. }()
  93. }
  94. wg.Wait()
  95. safeClose(t, producer)
  96. safeClose(t, client)
  97. leader.Close()
  98. seedBroker.Close()
  99. }
  100. func TestProducer(t *testing.T) {
  101. seedBroker := newMockBroker(t, 1)
  102. leader := newMockBroker(t, 2)
  103. metadataResponse := new(MetadataResponse)
  104. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  105. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  106. seedBroker.Returns(metadataResponse)
  107. prodSuccess := new(ProduceResponse)
  108. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  109. leader.Returns(prodSuccess)
  110. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  111. if err != nil {
  112. t.Fatal(err)
  113. }
  114. config := NewProducerConfig()
  115. config.FlushMsgCount = 10
  116. config.AckSuccesses = true
  117. producer, err := NewProducer(client, config)
  118. if err != nil {
  119. t.Fatal(err)
  120. }
  121. for i := 0; i < 10; i++ {
  122. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  123. }
  124. for i := 0; i < 10; i++ {
  125. select {
  126. case msg := <-producer.Errors():
  127. t.Error(msg.Err)
  128. if msg.Msg.flags != 0 {
  129. t.Error("Message had flags set")
  130. }
  131. case msg := <-producer.Successes():
  132. if msg.flags != 0 {
  133. t.Error("Message had flags set")
  134. }
  135. if msg.Metadata.(int) != i {
  136. t.Error("Message metadata did not match")
  137. }
  138. }
  139. }
  140. closeProducer(t, producer)
  141. safeClose(t, client)
  142. leader.Close()
  143. seedBroker.Close()
  144. }
  145. func TestProducerMultipleFlushes(t *testing.T) {
  146. seedBroker := newMockBroker(t, 1)
  147. leader := newMockBroker(t, 2)
  148. metadataResponse := new(MetadataResponse)
  149. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  150. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  151. seedBroker.Returns(metadataResponse)
  152. prodSuccess := new(ProduceResponse)
  153. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  154. leader.Returns(prodSuccess)
  155. leader.Returns(prodSuccess)
  156. leader.Returns(prodSuccess)
  157. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  158. if err != nil {
  159. t.Fatal(err)
  160. }
  161. config := NewProducerConfig()
  162. config.FlushMsgCount = 5
  163. config.AckSuccesses = true
  164. producer, err := NewProducer(client, config)
  165. if err != nil {
  166. t.Fatal(err)
  167. }
  168. for flush := 0; flush < 3; flush++ {
  169. for i := 0; i < 5; i++ {
  170. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  171. }
  172. for i := 0; i < 5; i++ {
  173. select {
  174. case msg := <-producer.Errors():
  175. t.Error(msg.Err)
  176. if msg.Msg.flags != 0 {
  177. t.Error("Message had flags set")
  178. }
  179. case msg := <-producer.Successes():
  180. if msg.flags != 0 {
  181. t.Error("Message had flags set")
  182. }
  183. }
  184. }
  185. }
  186. closeProducer(t, producer)
  187. safeClose(t, client)
  188. leader.Close()
  189. seedBroker.Close()
  190. }
  191. func TestProducerMultipleBrokers(t *testing.T) {
  192. seedBroker := newMockBroker(t, 1)
  193. leader0 := newMockBroker(t, 2)
  194. leader1 := newMockBroker(t, 3)
  195. metadataResponse := new(MetadataResponse)
  196. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  197. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  198. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
  199. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
  200. seedBroker.Returns(metadataResponse)
  201. prodResponse0 := new(ProduceResponse)
  202. prodResponse0.AddTopicPartition("my_topic", 0, ErrNoError)
  203. leader0.Returns(prodResponse0)
  204. prodResponse1 := new(ProduceResponse)
  205. prodResponse1.AddTopicPartition("my_topic", 1, ErrNoError)
  206. leader1.Returns(prodResponse1)
  207. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  208. if err != nil {
  209. t.Fatal(err)
  210. }
  211. config := NewProducerConfig()
  212. config.FlushMsgCount = 5
  213. config.AckSuccesses = true
  214. config.Partitioner = NewRoundRobinPartitioner
  215. producer, err := NewProducer(client, config)
  216. if err != nil {
  217. t.Fatal(err)
  218. }
  219. for i := 0; i < 10; i++ {
  220. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  221. }
  222. for i := 0; i < 10; i++ {
  223. select {
  224. case msg := <-producer.Errors():
  225. t.Error(msg.Err)
  226. if msg.Msg.flags != 0 {
  227. t.Error("Message had flags set")
  228. }
  229. case msg := <-producer.Successes():
  230. if msg.flags != 0 {
  231. t.Error("Message had flags set")
  232. }
  233. }
  234. }
  235. closeProducer(t, producer)
  236. safeClose(t, client)
  237. leader1.Close()
  238. leader0.Close()
  239. seedBroker.Close()
  240. }
  241. func TestProducerFailureRetry(t *testing.T) {
  242. seedBroker := newMockBroker(t, 1)
  243. leader1 := newMockBroker(t, 2)
  244. leader2 := newMockBroker(t, 3)
  245. metadataLeader1 := new(MetadataResponse)
  246. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  247. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  248. seedBroker.Returns(metadataLeader1)
  249. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  250. if err != nil {
  251. t.Fatal(err)
  252. }
  253. config := NewProducerConfig()
  254. config.FlushMsgCount = 10
  255. config.AckSuccesses = true
  256. config.RetryBackoff = 0
  257. producer, err := NewProducer(client, config)
  258. if err != nil {
  259. t.Fatal(err)
  260. }
  261. seedBroker.Close()
  262. for i := 0; i < 10; i++ {
  263. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  264. }
  265. prodNotLeader := new(ProduceResponse)
  266. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  267. leader1.Returns(prodNotLeader)
  268. metadataLeader2 := new(MetadataResponse)
  269. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  270. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  271. leader1.Returns(metadataLeader2)
  272. prodSuccess := new(ProduceResponse)
  273. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  274. leader2.Returns(prodSuccess)
  275. for i := 0; i < 10; i++ {
  276. select {
  277. case msg := <-producer.Errors():
  278. t.Error(msg.Err)
  279. if msg.Msg.flags != 0 {
  280. t.Error("Message had flags set")
  281. }
  282. case msg := <-producer.Successes():
  283. if msg.flags != 0 {
  284. t.Error("Message had flags set")
  285. }
  286. }
  287. }
  288. leader1.Close()
  289. for i := 0; i < 10; i++ {
  290. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  291. }
  292. leader2.Returns(prodSuccess)
  293. for i := 0; i < 10; i++ {
  294. select {
  295. case msg := <-producer.Errors():
  296. t.Error(msg.Err)
  297. if msg.Msg.flags != 0 {
  298. t.Error("Message had flags set")
  299. }
  300. case msg := <-producer.Successes():
  301. if msg.flags != 0 {
  302. t.Error("Message had flags set")
  303. }
  304. }
  305. }
  306. leader2.Close()
  307. closeProducer(t, producer)
  308. safeClose(t, client)
  309. }
  310. func TestProducerBrokerBounce(t *testing.T) {
  311. seedBroker := newMockBroker(t, 1)
  312. leader := newMockBroker(t, 2)
  313. leaderAddr := leader.Addr()
  314. metadataResponse := new(MetadataResponse)
  315. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  316. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
  317. seedBroker.Returns(metadataResponse)
  318. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  319. if err != nil {
  320. t.Fatal(err)
  321. }
  322. config := NewProducerConfig()
  323. config.FlushMsgCount = 10
  324. config.AckSuccesses = true
  325. config.RetryBackoff = 0
  326. producer, err := NewProducer(client, config)
  327. if err != nil {
  328. t.Fatal(err)
  329. }
  330. for i := 0; i < 10; i++ {
  331. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  332. }
  333. leader.Close() // producer should get EOF
  334. leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  335. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  336. prodSuccess := new(ProduceResponse)
  337. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  338. leader.Returns(prodSuccess)
  339. for i := 0; i < 10; i++ {
  340. select {
  341. case msg := <-producer.Errors():
  342. t.Error(msg.Err)
  343. if msg.Msg.flags != 0 {
  344. t.Error("Message had flags set")
  345. }
  346. case msg := <-producer.Successes():
  347. if msg.flags != 0 {
  348. t.Error("Message had flags set")
  349. }
  350. }
  351. }
  352. seedBroker.Close()
  353. leader.Close()
  354. closeProducer(t, producer)
  355. safeClose(t, client)
  356. }
  357. func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  358. seedBroker := newMockBroker(t, 1)
  359. leader1 := newMockBroker(t, 2)
  360. leader2 := newMockBroker(t, 3)
  361. metadataLeader1 := new(MetadataResponse)
  362. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  363. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  364. seedBroker.Returns(metadataLeader1)
  365. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  366. if err != nil {
  367. t.Fatal(err)
  368. }
  369. config := NewProducerConfig()
  370. config.FlushMsgCount = 10
  371. config.AckSuccesses = true
  372. config.MaxRetries = 3
  373. config.RetryBackoff = 0
  374. producer, err := NewProducer(client, config)
  375. if err != nil {
  376. t.Fatal(err)
  377. }
  378. for i := 0; i < 10; i++ {
  379. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  380. }
  381. leader1.Close() // producer should get EOF
  382. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  383. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  384. // ok fine, tell it to go to leader2 finally
  385. metadataLeader2 := new(MetadataResponse)
  386. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  387. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  388. seedBroker.Returns(metadataLeader2)
  389. prodSuccess := new(ProduceResponse)
  390. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  391. leader2.Returns(prodSuccess)
  392. for i := 0; i < 10; i++ {
  393. select {
  394. case msg := <-producer.Errors():
  395. t.Error(msg.Err)
  396. if msg.Msg.flags != 0 {
  397. t.Error("Message had flags set")
  398. }
  399. case msg := <-producer.Successes():
  400. if msg.flags != 0 {
  401. t.Error("Message had flags set")
  402. }
  403. }
  404. }
  405. seedBroker.Close()
  406. leader2.Close()
  407. closeProducer(t, producer)
  408. safeClose(t, client)
  409. }
  410. func TestProducerMultipleRetries(t *testing.T) {
  411. seedBroker := newMockBroker(t, 1)
  412. leader1 := newMockBroker(t, 2)
  413. leader2 := newMockBroker(t, 3)
  414. metadataLeader1 := new(MetadataResponse)
  415. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  416. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
  417. seedBroker.Returns(metadataLeader1)
  418. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  419. if err != nil {
  420. t.Fatal(err)
  421. }
  422. config := NewProducerConfig()
  423. config.FlushMsgCount = 10
  424. config.AckSuccesses = true
  425. config.MaxRetries = 4
  426. config.RetryBackoff = 0
  427. producer, err := NewProducer(client, config)
  428. if err != nil {
  429. t.Fatal(err)
  430. }
  431. for i := 0; i < 10; i++ {
  432. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  433. }
  434. prodNotLeader := new(ProduceResponse)
  435. prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotLeaderForPartition)
  436. leader1.Returns(prodNotLeader)
  437. metadataLeader2 := new(MetadataResponse)
  438. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  439. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
  440. seedBroker.Returns(metadataLeader2)
  441. leader2.Returns(prodNotLeader)
  442. seedBroker.Returns(metadataLeader1)
  443. leader1.Returns(prodNotLeader)
  444. seedBroker.Returns(metadataLeader1)
  445. leader1.Returns(prodNotLeader)
  446. seedBroker.Returns(metadataLeader2)
  447. prodSuccess := new(ProduceResponse)
  448. prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
  449. leader2.Returns(prodSuccess)
  450. for i := 0; i < 10; i++ {
  451. select {
  452. case msg := <-producer.Errors():
  453. t.Error(msg.Err)
  454. if msg.Msg.flags != 0 {
  455. t.Error("Message had flags set")
  456. }
  457. case msg := <-producer.Successes():
  458. if msg.flags != 0 {
  459. t.Error("Message had flags set")
  460. }
  461. }
  462. }
  463. for i := 0; i < 10; i++ {
  464. producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  465. }
  466. leader2.Returns(prodSuccess)
  467. for i := 0; i < 10; i++ {
  468. select {
  469. case msg := <-producer.Errors():
  470. t.Error(msg.Err)
  471. if msg.Msg.flags != 0 {
  472. t.Error("Message had flags set")
  473. }
  474. case msg := <-producer.Successes():
  475. if msg.flags != 0 {
  476. t.Error("Message had flags set")
  477. }
  478. }
  479. }
  480. seedBroker.Close()
  481. leader1.Close()
  482. leader2.Close()
  483. closeProducer(t, producer)
  484. safeClose(t, client)
  485. }
  486. func ExampleProducer() {
  487. client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
  488. if err != nil {
  489. panic(err)
  490. } else {
  491. fmt.Println("> connected")
  492. }
  493. defer client.Close()
  494. producer, err := NewProducer(client, nil)
  495. if err != nil {
  496. panic(err)
  497. }
  498. defer producer.Close()
  499. for {
  500. select {
  501. case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  502. fmt.Println("> message queued")
  503. case err := <-producer.Errors():
  504. panic(err.Err)
  505. }
  506. }
  507. }
  508. func ExampleSimpleProducer() {
  509. client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
  510. if err != nil {
  511. panic(err)
  512. } else {
  513. fmt.Println("> connected")
  514. }
  515. defer client.Close()
  516. producer, err := NewSimpleProducer(client, nil)
  517. if err != nil {
  518. panic(err)
  519. }
  520. defer producer.Close()
  521. for {
  522. err = producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
  523. if err != nil {
  524. panic(err)
  525. } else {
  526. fmt.Println("> message sent")
  527. }
  528. }
  529. }