functional_test.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. package sarama
  2. import (
  3. "fmt"
  4. "net"
  5. "os"
  6. "strings"
  7. "sync"
  8. "testing"
  9. "time"
  10. )
  11. const (
  12. TestBatchSize = 1000
  13. VagrantKafkaPeers = "192.168.100.67:6667,192.168.100.67:6668,192.168.100.67:6669,192.168.100.67:6670,192.168.100.67:6671"
  14. VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
  15. )
  16. var (
  17. kafkaIsAvailable, kafkaShouldBeAvailable bool
  18. kafkaBrokers []string
  19. )
  20. func init() {
  21. kafkaPeers := os.Getenv("KAFKA_PEERS")
  22. if kafkaPeers == "" {
  23. kafkaPeers = VagrantKafkaPeers
  24. }
  25. kafkaBrokers = strings.Split(kafkaPeers, ",")
  26. if c, err := net.Dial("tcp", kafkaBrokers[0]); err == nil {
  27. if err = c.Close(); err == nil {
  28. kafkaIsAvailable = true
  29. }
  30. }
  31. kafkaShouldBeAvailable = os.Getenv("CI") != ""
  32. }
  33. func checkKafkaAvailability(t *testing.T) {
  34. if !kafkaIsAvailable {
  35. if kafkaShouldBeAvailable {
  36. t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  37. } else {
  38. t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  39. }
  40. }
  41. }
  42. func TestFuncConnectionFailure(t *testing.T) {
  43. config := NewConfig()
  44. config.Metadata.Retry.Max = 1
  45. _, err := NewClient([]string{"localhost:9000"}, config)
  46. if err != ErrOutOfBrokers {
  47. t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
  48. }
  49. }
  50. func TestFuncClientMetadata(t *testing.T) {
  51. checkKafkaAvailability(t)
  52. config := NewConfig()
  53. config.Metadata.Retry.Max = 1
  54. config.Metadata.Retry.Backoff = 10 * time.Millisecond
  55. client, err := NewClient(kafkaBrokers, config)
  56. if err != nil {
  57. t.Fatal(err)
  58. }
  59. if err := client.RefreshMetadata("unknown_topic"); err != ErrUnknownTopicOrPartition {
  60. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  61. }
  62. if _, err := client.Leader("unknown_topic", 0); err != ErrUnknownTopicOrPartition {
  63. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  64. }
  65. if _, err := client.Replicas("invalid/topic", 0); err != ErrUnknownTopicOrPartition {
  66. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  67. }
  68. partitions, err := client.Partitions("multi_partition")
  69. if err != nil {
  70. t.Error(err)
  71. }
  72. if len(partitions) != 2 {
  73. t.Errorf("Expected multi_partition topic to have 2 partitions, found %v", partitions)
  74. }
  75. partitions, err = client.Partitions("single_partition")
  76. if err != nil {
  77. t.Error(err)
  78. }
  79. if len(partitions) != 1 {
  80. t.Errorf("Expected single_partition topic to have 1 partitions, found %v", partitions)
  81. }
  82. safeClose(t, client)
  83. }
  84. func TestFuncProducing(t *testing.T) {
  85. config := NewConfig()
  86. testProducingMessages(t, config)
  87. }
  88. func TestFuncProducingGzip(t *testing.T) {
  89. config := NewConfig()
  90. config.Producer.Compression = CompressionGZIP
  91. testProducingMessages(t, config)
  92. }
  93. func TestFuncProducingSnappy(t *testing.T) {
  94. config := NewConfig()
  95. config.Producer.Compression = CompressionSnappy
  96. testProducingMessages(t, config)
  97. }
  98. func TestFuncProducingNoResponse(t *testing.T) {
  99. config := NewConfig()
  100. config.Producer.RequiredAcks = NoResponse
  101. testProducingMessages(t, config)
  102. }
  103. func TestFuncProducingFlushing(t *testing.T) {
  104. config := NewConfig()
  105. config.Producer.Flush.Messages = TestBatchSize / 8
  106. config.Producer.Flush.Frequency = 250 * time.Millisecond
  107. testProducingMessages(t, config)
  108. }
  109. func TestFuncMultiPartitionProduce(t *testing.T) {
  110. checkKafkaAvailability(t)
  111. config := NewConfig()
  112. config.ChannelBufferSize = 20
  113. config.Producer.Flush.Frequency = 50 * time.Millisecond
  114. config.Producer.Flush.Messages = 200
  115. config.Producer.Return.Successes = true
  116. producer, err := NewAsyncProducer(kafkaBrokers, config)
  117. if err != nil {
  118. t.Fatal(err)
  119. }
  120. var wg sync.WaitGroup
  121. wg.Add(TestBatchSize)
  122. for i := 1; i <= TestBatchSize; i++ {
  123. go func(i int, w *sync.WaitGroup) {
  124. defer w.Done()
  125. msg := &ProducerMessage{Topic: "multi_partition", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
  126. producer.Input() <- msg
  127. select {
  128. case ret := <-producer.Errors():
  129. t.Fatal(ret.Err)
  130. case <-producer.Successes():
  131. }
  132. }(i, &wg)
  133. }
  134. wg.Wait()
  135. if err := producer.Close(); err != nil {
  136. t.Error(err)
  137. }
  138. }
  139. func TestProducingToInvalidTopic(t *testing.T) {
  140. checkKafkaAvailability(t)
  141. producer, err := NewSyncProducer(kafkaBrokers, nil)
  142. if err != nil {
  143. t.Fatal(err)
  144. }
  145. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  146. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  147. }
  148. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  149. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  150. }
  151. safeClose(t, producer)
  152. }
  153. func testProducingMessages(t *testing.T, config *Config) {
  154. checkKafkaAvailability(t)
  155. config.Producer.Return.Successes = true
  156. config.Consumer.Return.Errors = true
  157. client, err := NewClient(kafkaBrokers, config)
  158. if err != nil {
  159. t.Fatal(err)
  160. }
  161. master, err := NewConsumerFromClient(client)
  162. if err != nil {
  163. t.Fatal(err)
  164. }
  165. consumer, err := master.ConsumePartition("single_partition", 0, OffsetNewest)
  166. if err != nil {
  167. t.Fatal(err)
  168. }
  169. producer, err := NewAsyncProducerFromClient(client)
  170. if err != nil {
  171. t.Fatal(err)
  172. }
  173. expectedResponses := TestBatchSize
  174. for i := 1; i <= TestBatchSize; {
  175. msg := &ProducerMessage{Topic: "single_partition", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
  176. select {
  177. case producer.Input() <- msg:
  178. i++
  179. case ret := <-producer.Errors():
  180. t.Fatal(ret.Err)
  181. case <-producer.Successes():
  182. expectedResponses--
  183. }
  184. }
  185. for expectedResponses > 0 {
  186. select {
  187. case ret := <-producer.Errors():
  188. t.Fatal(ret.Err)
  189. case <-producer.Successes():
  190. expectedResponses--
  191. }
  192. }
  193. safeClose(t, producer)
  194. for i := 1; i <= TestBatchSize; i++ {
  195. select {
  196. case <-time.After(10 * time.Second):
  197. t.Fatal("Not received any more events in the last 10 seconds.")
  198. case err := <-consumer.Errors():
  199. t.Error(err)
  200. case message := <-consumer.Messages():
  201. if string(message.Value) != fmt.Sprintf("testing %d", i) {
  202. t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
  203. }
  204. }
  205. }
  206. safeClose(t, consumer)
  207. safeClose(t, client)
  208. }