consumer_test.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package mocks
  2. import (
  3. "testing"
  4. "github.com/Shopify/sarama"
  5. )
  6. func TestMockConsumerImplementsConsumerInterface(t *testing.T) {
  7. var c interface{} = &Consumer{}
  8. if _, ok := c.(sarama.Consumer); !ok {
  9. t.Error("The mock consumer should implement the sarama.Consumer interface.")
  10. }
  11. var pc interface{} = &PartitionConsumer{}
  12. if _, ok := pc.(sarama.PartitionConsumer); !ok {
  13. t.Error("The mock partitionconsumer should implement the sarama.PartitionConsumer interface.")
  14. }
  15. }
  16. func TestConsumerHandlesExpectations(t *testing.T) {
  17. consumer := NewConsumer(t, nil)
  18. defer func() {
  19. if err := consumer.Close(); err != nil {
  20. t.Error(err)
  21. }
  22. }()
  23. consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")})
  24. consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers)
  25. consumer.ExpectConsumePartition("test", 1, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world again")})
  26. consumer.ExpectConsumePartition("other", 0, AnyOffset).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello other")})
  27. pc_test0, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
  28. if err != nil {
  29. t.Fatal(err)
  30. }
  31. test0_msg := <-pc_test0.Messages()
  32. if test0_msg.Topic != "test" || test0_msg.Partition != 0 || string(test0_msg.Value) != "hello world" {
  33. t.Error("Message was not as expected:", test0_msg)
  34. }
  35. test0_err := <-pc_test0.Errors()
  36. if test0_err.Err != sarama.ErrOutOfBrokers {
  37. t.Error("Expected sarama.ErrOutOfBrokers, found:", test0_err.Err)
  38. }
  39. pc_test1, err := consumer.ConsumePartition("test", 1, sarama.OffsetOldest)
  40. if err != nil {
  41. t.Fatal(err)
  42. }
  43. test1_msg := <-pc_test1.Messages()
  44. if test1_msg.Topic != "test" || test1_msg.Partition != 1 || string(test1_msg.Value) != "hello world again" {
  45. t.Error("Message was not as expected:", test1_msg)
  46. }
  47. pc_other0, err := consumer.ConsumePartition("other", 0, sarama.OffsetNewest)
  48. if err != nil {
  49. t.Fatal(err)
  50. }
  51. other0_msg := <-pc_other0.Messages()
  52. if other0_msg.Topic != "other" || other0_msg.Partition != 0 || string(other0_msg.Value) != "hello other" {
  53. t.Error("Message was not as expected:", other0_msg)
  54. }
  55. }
  56. func TestConsumerReturnsNonconsumedErrorsOnClose(t *testing.T) {
  57. consumer := NewConsumer(t, nil)
  58. consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers)
  59. consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldError(sarama.ErrOutOfBrokers)
  60. pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
  61. if err != nil {
  62. t.Fatal(err)
  63. }
  64. select {
  65. case <-pc.Messages():
  66. t.Error("Did not epxect a message on the messages channel.")
  67. case err := <-pc.Errors():
  68. if err.Err != sarama.ErrOutOfBrokers {
  69. t.Error("Expected sarama.ErrOutOfBrokers, found", err)
  70. }
  71. }
  72. errs := pc.Close().(sarama.ConsumerErrors)
  73. if len(errs) != 1 && errs[0].Err != sarama.ErrOutOfBrokers {
  74. t.Error("Expected Close to return the remaining sarama.ErrOutOfBrokers")
  75. }
  76. }
  77. func TestConsumerWithoutExpectationsOnPartition(t *testing.T) {
  78. trm := newTestReporterMock()
  79. consumer := NewConsumer(trm, nil)
  80. _, err := consumer.ConsumePartition("test", 1, sarama.OffsetOldest)
  81. if err != errOutOfExpectations {
  82. t.Error("Expected ConsumePartition to return errOutOfExpectations")
  83. }
  84. if err := consumer.Close(); err != nil {
  85. t.Error("No error expected on close, but found:", err)
  86. }
  87. if len(trm.errors) != 1 {
  88. t.Errorf("Expected an expectation failure to be set on the error reporter.")
  89. }
  90. }
  91. func TestConsumerWithExpectationsOnUnconsumedPartition(t *testing.T) {
  92. trm := newTestReporterMock()
  93. consumer := NewConsumer(trm, nil)
  94. consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest).YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello world")})
  95. if err := consumer.Close(); err != nil {
  96. t.Error("No error expected on close, but found:", err)
  97. }
  98. if len(trm.errors) != 1 {
  99. t.Errorf("Expected an expectation failure to be set on the error reporter.")
  100. }
  101. }
  102. func TestConsumerWithWrongOffsetExpectation(t *testing.T) {
  103. trm := newTestReporterMock()
  104. consumer := NewConsumer(trm, nil)
  105. consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
  106. _, err := consumer.ConsumePartition("test", 0, sarama.OffsetNewest)
  107. if err != nil {
  108. t.Error("Did not expect error, found:", err)
  109. }
  110. if len(trm.errors) != 1 {
  111. t.Errorf("Expected an expectation failure to be set on the error reporter.")
  112. }
  113. if err := consumer.Close(); err != nil {
  114. t.Error(err)
  115. }
  116. }
  117. func TestConsumerViolatesMessagesDrainedExpectation(t *testing.T) {
  118. trm := newTestReporterMock()
  119. consumer := NewConsumer(trm, nil)
  120. pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
  121. pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
  122. pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")})
  123. pcmock.ExpectMessagesDrainedOnClose()
  124. pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
  125. if err != nil {
  126. t.Error(err)
  127. }
  128. // consume first message, not second one
  129. <-pc.Messages()
  130. if err := consumer.Close(); err != nil {
  131. t.Error(err)
  132. }
  133. if len(trm.errors) != 1 {
  134. t.Errorf("Expected an expectation failure to be set on the error reporter.")
  135. }
  136. }
  137. func TestConsumerMeetsErrorsDrainedExpectation(t *testing.T) {
  138. trm := newTestReporterMock()
  139. consumer := NewConsumer(trm, nil)
  140. pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest)
  141. pcmock.YieldError(sarama.ErrInvalidMessage)
  142. pcmock.YieldError(sarama.ErrInvalidMessage)
  143. pcmock.ExpectErrorsDrainedOnClose()
  144. pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest)
  145. if err != nil {
  146. t.Error(err)
  147. }
  148. // consume first and second error,
  149. <-pc.Errors()
  150. <-pc.Errors()
  151. if err := consumer.Close(); err != nil {
  152. t.Error(err)
  153. }
  154. if len(trm.errors) != 0 {
  155. t.Errorf("Expected ano expectation failures to be set on the error reporter.")
  156. }
  157. }