producer_test.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590
  1. package sarama
  2. import (
  3. "fmt"
  4. "sync"
  5. "testing"
  6. )
  7. const TestMessage = "ABC THE MESSAGE"
  8. func TestDefaultProducerConfigValidates(t *testing.T) {
  9. config := NewProducerConfig()
  10. if err := config.Validate(); err != nil {
  11. t.Error(err)
  12. }
  13. }
  14. func TestSimpleProducer(t *testing.T) {
  15. seedBroker := NewMockBroker(t, 1)
  16. leader := NewMockBroker(t, 2)
  17. metadataResponse := new(MetadataResponse)
  18. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  19. metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
  20. seedBroker.Returns(metadataResponse)
  21. prodSuccess := new(ProduceResponse)
  22. prodSuccess.AddTopicPartition("my_topic", 0, NoError)
  23. for i := 0; i < 10; i++ {
  24. leader.Returns(prodSuccess)
  25. }
  26. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  27. if err != nil {
  28. t.Fatal(err)
  29. }
  30. producer, err := NewSimpleProducer(client, nil)
  31. if err != nil {
  32. t.Fatal(err)
  33. }
  34. for i := 0; i < 10; i++ {
  35. err = producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
  36. if err != nil {
  37. t.Error(err)
  38. }
  39. }
  40. safeClose(t, producer)
  41. safeClose(t, client)
  42. leader.Close()
  43. seedBroker.Close()
  44. }
  45. func TestConcurrentSimpleProducer(t *testing.T) {
  46. seedBroker := NewMockBroker(t, 1)
  47. leader := NewMockBroker(t, 2)
  48. metadataResponse := new(MetadataResponse)
  49. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  50. metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
  51. seedBroker.Returns(metadataResponse)
  52. prodSuccess := new(ProduceResponse)
  53. prodSuccess.AddTopicPartition("my_topic", 0, NoError)
  54. leader.Returns(prodSuccess)
  55. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  56. if err != nil {
  57. t.Fatal(err)
  58. }
  59. config := NewProducerConfig()
  60. config.FlushMsgCount = 100
  61. producer, err := NewSimpleProducer(client, config)
  62. if err != nil {
  63. t.Fatal(err)
  64. }
  65. wg := sync.WaitGroup{}
  66. for i := 0; i < 100; i++ {
  67. wg.Add(1)
  68. go func() {
  69. err := producer.SendMessage("my_topic", nil, StringEncoder(TestMessage))
  70. if err != nil {
  71. t.Error(err)
  72. }
  73. wg.Done()
  74. }()
  75. }
  76. wg.Wait()
  77. safeClose(t, producer)
  78. safeClose(t, client)
  79. leader.Close()
  80. seedBroker.Close()
  81. }
  82. func TestProducer(t *testing.T) {
  83. seedBroker := NewMockBroker(t, 1)
  84. leader := NewMockBroker(t, 2)
  85. metadataResponse := new(MetadataResponse)
  86. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  87. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
  88. seedBroker.Returns(metadataResponse)
  89. prodSuccess := new(ProduceResponse)
  90. prodSuccess.AddTopicPartition("my_topic", 0, NoError)
  91. leader.Returns(prodSuccess)
  92. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  93. if err != nil {
  94. t.Fatal(err)
  95. }
  96. config := NewProducerConfig()
  97. config.FlushMsgCount = 10
  98. config.AckSuccesses = true
  99. producer, err := NewProducer(client, config)
  100. if err != nil {
  101. t.Fatal(err)
  102. }
  103. for i := 0; i < 10; i++ {
  104. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Metadata: i}
  105. }
  106. for i := 0; i < 10; i++ {
  107. select {
  108. case msg := <-producer.Errors():
  109. t.Error(msg.Err)
  110. if msg.Msg.flags != 0 {
  111. t.Error("Message had flags set")
  112. }
  113. case msg := <-producer.Successes():
  114. if msg.flags != 0 {
  115. t.Error("Message had flags set")
  116. }
  117. if msg.Metadata.(int) != i {
  118. t.Error("Message metadata did not match")
  119. }
  120. }
  121. }
  122. safeClose(t, producer)
  123. safeClose(t, client)
  124. leader.Close()
  125. seedBroker.Close()
  126. }
  127. func TestProducerMultipleFlushes(t *testing.T) {
  128. seedBroker := NewMockBroker(t, 1)
  129. leader := NewMockBroker(t, 2)
  130. metadataResponse := new(MetadataResponse)
  131. metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
  132. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
  133. seedBroker.Returns(metadataResponse)
  134. prodSuccess := new(ProduceResponse)
  135. prodSuccess.AddTopicPartition("my_topic", 0, NoError)
  136. leader.Returns(prodSuccess)
  137. leader.Returns(prodSuccess)
  138. leader.Returns(prodSuccess)
  139. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  140. if err != nil {
  141. t.Fatal(err)
  142. }
  143. config := NewProducerConfig()
  144. config.FlushMsgCount = 5
  145. config.AckSuccesses = true
  146. producer, err := NewProducer(client, config)
  147. if err != nil {
  148. t.Fatal(err)
  149. }
  150. for flush := 0; flush < 3; flush++ {
  151. for i := 0; i < 5; i++ {
  152. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  153. }
  154. for i := 0; i < 5; i++ {
  155. select {
  156. case msg := <-producer.Errors():
  157. t.Error(msg.Err)
  158. if msg.Msg.flags != 0 {
  159. t.Error("Message had flags set")
  160. }
  161. case msg := <-producer.Successes():
  162. if msg.flags != 0 {
  163. t.Error("Message had flags set")
  164. }
  165. }
  166. }
  167. }
  168. safeClose(t, producer)
  169. safeClose(t, client)
  170. leader.Close()
  171. seedBroker.Close()
  172. }
  173. func TestProducerMultipleBrokers(t *testing.T) {
  174. seedBroker := NewMockBroker(t, 1)
  175. leader0 := NewMockBroker(t, 2)
  176. leader1 := NewMockBroker(t, 3)
  177. metadataResponse := new(MetadataResponse)
  178. metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
  179. metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
  180. metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, NoError)
  181. metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
  182. seedBroker.Returns(metadataResponse)
  183. prodResponse0 := new(ProduceResponse)
  184. prodResponse0.AddTopicPartition("my_topic", 0, NoError)
  185. leader0.Returns(prodResponse0)
  186. prodResponse1 := new(ProduceResponse)
  187. prodResponse1.AddTopicPartition("my_topic", 1, NoError)
  188. leader1.Returns(prodResponse1)
  189. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  190. if err != nil {
  191. t.Fatal(err)
  192. }
  193. config := NewProducerConfig()
  194. config.FlushMsgCount = 5
  195. config.AckSuccesses = true
  196. config.Partitioner = NewRoundRobinPartitioner
  197. producer, err := NewProducer(client, config)
  198. if err != nil {
  199. t.Fatal(err)
  200. }
  201. for i := 0; i < 10; i++ {
  202. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  203. }
  204. for i := 0; i < 10; i++ {
  205. select {
  206. case msg := <-producer.Errors():
  207. t.Error(msg.Err)
  208. if msg.Msg.flags != 0 {
  209. t.Error("Message had flags set")
  210. }
  211. case msg := <-producer.Successes():
  212. if msg.flags != 0 {
  213. t.Error("Message had flags set")
  214. }
  215. }
  216. }
  217. safeClose(t, producer)
  218. safeClose(t, client)
  219. leader1.Close()
  220. leader0.Close()
  221. seedBroker.Close()
  222. }
  223. func TestProducerFailureRetry(t *testing.T) {
  224. seedBroker := NewMockBroker(t, 1)
  225. leader1 := NewMockBroker(t, 2)
  226. leader2 := NewMockBroker(t, 3)
  227. metadataLeader1 := new(MetadataResponse)
  228. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  229. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
  230. seedBroker.Returns(metadataLeader1)
  231. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  232. if err != nil {
  233. t.Fatal(err)
  234. }
  235. config := NewProducerConfig()
  236. config.FlushMsgCount = 10
  237. config.AckSuccesses = true
  238. config.RetryBackoff = 0
  239. producer, err := NewProducer(client, config)
  240. if err != nil {
  241. t.Fatal(err)
  242. }
  243. seedBroker.Close()
  244. for i := 0; i < 10; i++ {
  245. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  246. }
  247. prodNotLeader := new(ProduceResponse)
  248. prodNotLeader.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
  249. leader1.Returns(prodNotLeader)
  250. metadataLeader2 := new(MetadataResponse)
  251. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  252. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
  253. leader1.Returns(metadataLeader2)
  254. prodSuccess := new(ProduceResponse)
  255. prodSuccess.AddTopicPartition("my_topic", 0, NoError)
  256. leader2.Returns(prodSuccess)
  257. for i := 0; i < 10; i++ {
  258. select {
  259. case msg := <-producer.Errors():
  260. t.Error(msg.Err)
  261. if msg.Msg.flags != 0 {
  262. t.Error("Message had flags set")
  263. }
  264. case msg := <-producer.Successes():
  265. if msg.flags != 0 {
  266. t.Error("Message had flags set")
  267. }
  268. }
  269. }
  270. leader1.Close()
  271. for i := 0; i < 10; i++ {
  272. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  273. }
  274. leader2.Returns(prodSuccess)
  275. for i := 0; i < 10; i++ {
  276. select {
  277. case msg := <-producer.Errors():
  278. t.Error(msg.Err)
  279. if msg.Msg.flags != 0 {
  280. t.Error("Message had flags set")
  281. }
  282. case msg := <-producer.Successes():
  283. if msg.flags != 0 {
  284. t.Error("Message had flags set")
  285. }
  286. }
  287. }
  288. leader2.Close()
  289. safeClose(t, producer)
  290. safeClose(t, client)
  291. }
  292. func TestProducerBrokerBounce(t *testing.T) {
  293. seedBroker := NewMockBroker(t, 1)
  294. leader := NewMockBroker(t, 2)
  295. leaderAddr := leader.Addr()
  296. metadataResponse := new(MetadataResponse)
  297. metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
  298. metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
  299. seedBroker.Returns(metadataResponse)
  300. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  301. if err != nil {
  302. t.Fatal(err)
  303. }
  304. config := NewProducerConfig()
  305. config.FlushMsgCount = 10
  306. config.AckSuccesses = true
  307. config.RetryBackoff = 0
  308. producer, err := NewProducer(client, config)
  309. if err != nil {
  310. t.Fatal(err)
  311. }
  312. for i := 0; i < 10; i++ {
  313. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  314. }
  315. leader.Close() // producer should get EOF
  316. leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
  317. seedBroker.Returns(metadataResponse) // tell it to go to broker 2 again
  318. prodSuccess := new(ProduceResponse)
  319. prodSuccess.AddTopicPartition("my_topic", 0, NoError)
  320. leader.Returns(prodSuccess)
  321. for i := 0; i < 10; i++ {
  322. select {
  323. case msg := <-producer.Errors():
  324. t.Error(msg.Err)
  325. if msg.Msg.flags != 0 {
  326. t.Error("Message had flags set")
  327. }
  328. case msg := <-producer.Successes():
  329. if msg.flags != 0 {
  330. t.Error("Message had flags set")
  331. }
  332. }
  333. }
  334. seedBroker.Close()
  335. leader.Close()
  336. safeClose(t, producer)
  337. safeClose(t, client)
  338. }
  339. func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
  340. seedBroker := NewMockBroker(t, 1)
  341. leader1 := NewMockBroker(t, 2)
  342. leader2 := NewMockBroker(t, 3)
  343. metadataLeader1 := new(MetadataResponse)
  344. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  345. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
  346. seedBroker.Returns(metadataLeader1)
  347. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  348. if err != nil {
  349. t.Fatal(err)
  350. }
  351. config := NewProducerConfig()
  352. config.FlushMsgCount = 10
  353. config.AckSuccesses = true
  354. config.MaxRetries = 3
  355. config.RetryBackoff = 0
  356. producer, err := NewProducer(client, config)
  357. if err != nil {
  358. t.Fatal(err)
  359. }
  360. for i := 0; i < 10; i++ {
  361. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  362. }
  363. leader1.Close() // producer should get EOF
  364. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  365. seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
  366. // ok fine, tell it to go to leader2 finally
  367. metadataLeader2 := new(MetadataResponse)
  368. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  369. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
  370. seedBroker.Returns(metadataLeader2)
  371. prodSuccess := new(ProduceResponse)
  372. prodSuccess.AddTopicPartition("my_topic", 0, NoError)
  373. leader2.Returns(prodSuccess)
  374. for i := 0; i < 10; i++ {
  375. select {
  376. case msg := <-producer.Errors():
  377. t.Error(msg.Err)
  378. if msg.Msg.flags != 0 {
  379. t.Error("Message had flags set")
  380. }
  381. case msg := <-producer.Successes():
  382. if msg.flags != 0 {
  383. t.Error("Message had flags set")
  384. }
  385. }
  386. }
  387. seedBroker.Close()
  388. leader2.Close()
  389. safeClose(t, producer)
  390. safeClose(t, client)
  391. }
  392. func TestProducerMultipleRetries(t *testing.T) {
  393. seedBroker := NewMockBroker(t, 1)
  394. leader1 := NewMockBroker(t, 2)
  395. leader2 := NewMockBroker(t, 3)
  396. metadataLeader1 := new(MetadataResponse)
  397. metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
  398. metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
  399. seedBroker.Returns(metadataLeader1)
  400. client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
  401. if err != nil {
  402. t.Fatal(err)
  403. }
  404. config := NewProducerConfig()
  405. config.FlushMsgCount = 10
  406. config.AckSuccesses = true
  407. config.MaxRetries = 4
  408. config.RetryBackoff = 0
  409. producer, err := NewProducer(client, config)
  410. if err != nil {
  411. t.Fatal(err)
  412. }
  413. for i := 0; i < 10; i++ {
  414. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  415. }
  416. prodNotLeader := new(ProduceResponse)
  417. prodNotLeader.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
  418. leader1.Returns(prodNotLeader)
  419. metadataLeader2 := new(MetadataResponse)
  420. metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
  421. metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
  422. seedBroker.Returns(metadataLeader2)
  423. leader2.Returns(prodNotLeader)
  424. seedBroker.Returns(metadataLeader1)
  425. leader1.Returns(prodNotLeader)
  426. seedBroker.Returns(metadataLeader1)
  427. leader1.Returns(prodNotLeader)
  428. seedBroker.Returns(metadataLeader2)
  429. prodSuccess := new(ProduceResponse)
  430. prodSuccess.AddTopicPartition("my_topic", 0, NoError)
  431. leader2.Returns(prodSuccess)
  432. for i := 0; i < 10; i++ {
  433. select {
  434. case msg := <-producer.Errors():
  435. t.Error(msg.Err)
  436. if msg.Msg.flags != 0 {
  437. t.Error("Message had flags set")
  438. }
  439. case msg := <-producer.Successes():
  440. if msg.flags != 0 {
  441. t.Error("Message had flags set")
  442. }
  443. }
  444. }
  445. for i := 0; i < 10; i++ {
  446. producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
  447. }
  448. leader2.Returns(prodSuccess)
  449. for i := 0; i < 10; i++ {
  450. select {
  451. case msg := <-producer.Errors():
  452. t.Error(msg.Err)
  453. if msg.Msg.flags != 0 {
  454. t.Error("Message had flags set")
  455. }
  456. case msg := <-producer.Successes():
  457. if msg.flags != 0 {
  458. t.Error("Message had flags set")
  459. }
  460. }
  461. }
  462. seedBroker.Close()
  463. leader1.Close()
  464. leader2.Close()
  465. safeClose(t, producer)
  466. safeClose(t, client)
  467. }
  468. func ExampleProducer() {
  469. client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
  470. if err != nil {
  471. panic(err)
  472. } else {
  473. fmt.Println("> connected")
  474. }
  475. defer client.Close()
  476. producer, err := NewProducer(client, nil)
  477. if err != nil {
  478. panic(err)
  479. }
  480. defer producer.Close()
  481. for {
  482. select {
  483. case producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
  484. fmt.Println("> message queued")
  485. case err := <-producer.Errors():
  486. panic(err.Err)
  487. }
  488. }
  489. }
  490. func ExampleSimpleProducer() {
  491. client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
  492. if err != nil {
  493. panic(err)
  494. } else {
  495. fmt.Println("> connected")
  496. }
  497. defer client.Close()
  498. producer, err := NewSimpleProducer(client, nil)
  499. if err != nil {
  500. panic(err)
  501. }
  502. defer producer.Close()
  503. for {
  504. err = producer.SendMessage("my_topic", nil, StringEncoder("testing 123"))
  505. if err != nil {
  506. panic(err)
  507. } else {
  508. fmt.Println("> message sent")
  509. }
  510. }
  511. }