functional_test.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package sarama
  2. import (
  3. "fmt"
  4. "log"
  5. "net"
  6. "os"
  7. "strings"
  8. "sync"
  9. "testing"
  10. "time"
  11. )
  12. const (
  13. TestBatchSize = 1000
  14. VagrantKafkaPeers = "192.168.100.67:6667,192.168.100.67:6668,192.168.100.67:6669,192.168.100.67:6670,192.168.100.67:6671"
  15. VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
  16. )
  17. var (
  18. kafkaIsAvailable, kafkaShouldBeAvailable bool
  19. kafkaBrokers []string
  20. )
  21. func init() {
  22. kafkaPeers := os.Getenv("KAFKA_PEERS")
  23. if kafkaPeers == "" {
  24. kafkaPeers = VagrantKafkaPeers
  25. }
  26. kafkaBrokers = strings.Split(kafkaPeers, ",")
  27. if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
  28. if err = c.Close(); err == nil {
  29. kafkaIsAvailable = true
  30. }
  31. }
  32. kafkaShouldBeAvailable = os.Getenv("CI") != ""
  33. if os.Getenv("DEBUG") == "true" {
  34. Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  35. }
  36. }
  37. func checkKafkaAvailability(t *testing.T) {
  38. if !kafkaIsAvailable {
  39. if kafkaShouldBeAvailable {
  40. t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  41. } else {
  42. t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  43. }
  44. }
  45. }
  46. func TestFuncConnectionFailure(t *testing.T) {
  47. config := NewConfig()
  48. config.Metadata.Retry.Max = 1
  49. _, err := NewClient([]string{"localhost:9000"}, config)
  50. if err != ErrOutOfBrokers {
  51. t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
  52. }
  53. }
  54. func TestFuncClientMetadata(t *testing.T) {
  55. checkKafkaAvailability(t)
  56. config := NewConfig()
  57. config.Metadata.Retry.Max = 1
  58. config.Metadata.Retry.Backoff = 10 * time.Millisecond
  59. client, err := NewClient(kafkaBrokers, config)
  60. if err != nil {
  61. t.Fatal(err)
  62. }
  63. if err := client.RefreshMetadata("unknown_topic"); err != ErrUnknownTopicOrPartition {
  64. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  65. }
  66. if _, err := client.Leader("unknown_topic", 0); err != ErrUnknownTopicOrPartition {
  67. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  68. }
  69. if _, err := client.Replicas("invalid/topic", 0); err != ErrUnknownTopicOrPartition {
  70. t.Error("Expected ErrUnknownTopicOrPartition, got", err)
  71. }
  72. partitions, err := client.Partitions("multi_partition")
  73. if err != nil {
  74. t.Error(err)
  75. }
  76. if len(partitions) != 2 {
  77. t.Errorf("Expected multi_partition topic to have 2 partitions, found %v", partitions)
  78. }
  79. partitions, err = client.Partitions("single_partition")
  80. if err != nil {
  81. t.Error(err)
  82. }
  83. if len(partitions) != 1 {
  84. t.Errorf("Expected single_partition topic to have 1 partitions, found %v", partitions)
  85. }
  86. safeClose(t, client)
  87. }
  88. func TestFuncProducing(t *testing.T) {
  89. config := NewConfig()
  90. testProducingMessages(t, config)
  91. }
  92. func TestFuncProducingGzip(t *testing.T) {
  93. config := NewConfig()
  94. config.Producer.Compression = CompressionGZIP
  95. testProducingMessages(t, config)
  96. }
  97. func TestFuncProducingSnappy(t *testing.T) {
  98. config := NewConfig()
  99. config.Producer.Compression = CompressionSnappy
  100. testProducingMessages(t, config)
  101. }
  102. func TestFuncProducingNoResponse(t *testing.T) {
  103. config := NewConfig()
  104. config.Producer.RequiredAcks = NoResponse
  105. testProducingMessages(t, config)
  106. }
  107. func TestFuncProducingFlushing(t *testing.T) {
  108. config := NewConfig()
  109. config.Producer.Flush.Messages = TestBatchSize / 8
  110. config.Producer.Flush.Frequency = 250 * time.Millisecond
  111. testProducingMessages(t, config)
  112. }
  113. func TestFuncMultiPartitionProduce(t *testing.T) {
  114. checkKafkaAvailability(t)
  115. config := NewConfig()
  116. config.ChannelBufferSize = 20
  117. config.Producer.Flush.Frequency = 50 * time.Millisecond
  118. config.Producer.Flush.Messages = 200
  119. config.Producer.Return.Successes = true
  120. producer, err := NewAsyncProducer(kafkaBrokers, config)
  121. if err != nil {
  122. t.Fatal(err)
  123. }
  124. var wg sync.WaitGroup
  125. wg.Add(TestBatchSize)
  126. for i := 1; i <= TestBatchSize; i++ {
  127. go func(i int, w *sync.WaitGroup) {
  128. defer w.Done()
  129. msg := &ProducerMessage{Topic: "multi_partition", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
  130. producer.Input() <- msg
  131. select {
  132. case ret := <-producer.Errors():
  133. t.Fatal(ret.Err)
  134. case <-producer.Successes():
  135. }
  136. }(i, &wg)
  137. }
  138. wg.Wait()
  139. if err := producer.Close(); err != nil {
  140. t.Error(err)
  141. }
  142. }
  143. func TestProducingToInvalidTopic(t *testing.T) {
  144. checkKafkaAvailability(t)
  145. producer, err := NewSyncProducer(kafkaBrokers, nil)
  146. if err != nil {
  147. t.Fatal(err)
  148. }
  149. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  150. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  151. }
  152. if _, _, err := producer.SendMessage(&ProducerMessage{Topic: "in/valid"}); err != ErrUnknownTopicOrPartition {
  153. t.Error("Expected ErrUnknownTopicOrPartition, found", err)
  154. }
  155. safeClose(t, producer)
  156. }
  157. func testProducingMessages(t *testing.T, config *Config) {
  158. checkKafkaAvailability(t)
  159. config.Producer.Return.Successes = true
  160. config.Consumer.Return.Errors = true
  161. client, err := NewClient(kafkaBrokers, config)
  162. if err != nil {
  163. t.Fatal(err)
  164. }
  165. master, err := NewConsumerFromClient(client)
  166. if err != nil {
  167. t.Fatal(err)
  168. }
  169. consumer, err := master.ConsumePartition("single_partition", 0, OffsetNewest)
  170. if err != nil {
  171. t.Fatal(err)
  172. }
  173. producer, err := NewAsyncProducerFromClient(client)
  174. if err != nil {
  175. t.Fatal(err)
  176. }
  177. expectedResponses := TestBatchSize
  178. for i := 1; i <= TestBatchSize; {
  179. msg := &ProducerMessage{Topic: "single_partition", Key: nil, Value: StringEncoder(fmt.Sprintf("testing %d", i))}
  180. select {
  181. case producer.Input() <- msg:
  182. i++
  183. case ret := <-producer.Errors():
  184. t.Fatal(ret.Err)
  185. case <-producer.Successes():
  186. expectedResponses--
  187. }
  188. }
  189. for expectedResponses > 0 {
  190. select {
  191. case ret := <-producer.Errors():
  192. t.Fatal(ret.Err)
  193. case <-producer.Successes():
  194. expectedResponses--
  195. }
  196. }
  197. safeClose(t, producer)
  198. for i := 1; i <= TestBatchSize; i++ {
  199. select {
  200. case <-time.After(10 * time.Second):
  201. t.Fatal("Not received any more events in the last 10 seconds.")
  202. case err := <-consumer.Errors():
  203. t.Error(err)
  204. case message := <-consumer.Messages():
  205. if string(message.Value) != fmt.Sprintf("testing %d", i) {
  206. t.Fatalf("Unexpected message with index %d: %s", i, message.Value)
  207. }
  208. }
  209. }
  210. safeClose(t, consumer)
  211. safeClose(t, client)
  212. }