sync_producer_test.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package mocks
  2. import (
  3. "errors"
  4. "strings"
  5. "testing"
  6. "github.com/Shopify/sarama"
  7. )
  8. func TestMockSyncProducerImplementsSyncProducerInterface(t *testing.T) {
  9. var mp interface{} = &SyncProducer{}
  10. if _, ok := mp.(sarama.SyncProducer); !ok {
  11. t.Error("The mock async producer should implement the sarama.SyncProducer interface.")
  12. }
  13. }
  14. func TestSyncProducerReturnsExpectationsToSendMessage(t *testing.T) {
  15. sp := NewSyncProducer(t, nil)
  16. defer func() {
  17. if err := sp.Close(); err != nil {
  18. t.Error(err)
  19. }
  20. }()
  21. sp.ExpectSendMessageAndSucceed()
  22. sp.ExpectSendMessageAndSucceed()
  23. sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
  24. msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
  25. _, offset, err := sp.SendMessage(msg)
  26. if err != nil {
  27. t.Errorf("The first message should have been produced successfully, but got %s", err)
  28. }
  29. if offset != 1 || offset != msg.Offset {
  30. t.Errorf("The first message should have been assigned offset 1, but got %d", msg.Offset)
  31. }
  32. _, offset, err = sp.SendMessage(msg)
  33. if err != nil {
  34. t.Errorf("The second message should have been produced successfully, but got %s", err)
  35. }
  36. if offset != 2 || offset != msg.Offset {
  37. t.Errorf("The second message should have been assigned offset 2, but got %d", offset)
  38. }
  39. _, _, err = sp.SendMessage(msg)
  40. if err != sarama.ErrOutOfBrokers {
  41. t.Errorf("The third message should not have been produced successfully")
  42. }
  43. if err := sp.Close(); err != nil {
  44. t.Error(err)
  45. }
  46. }
  47. func TestSyncProducerWithTooManyExpectations(t *testing.T) {
  48. trm := newTestReporterMock()
  49. sp := NewSyncProducer(trm, nil)
  50. sp.ExpectSendMessageAndSucceed()
  51. sp.ExpectSendMessageAndFail(sarama.ErrOutOfBrokers)
  52. msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
  53. if _, _, err := sp.SendMessage(msg); err != nil {
  54. t.Error("No error expected on first SendMessage call", err)
  55. }
  56. if err := sp.Close(); err != nil {
  57. t.Error(err)
  58. }
  59. if len(trm.errors) != 1 {
  60. t.Error("Expected to report an error")
  61. }
  62. }
  63. func TestSyncProducerWithTooFewExpectations(t *testing.T) {
  64. trm := newTestReporterMock()
  65. sp := NewSyncProducer(trm, nil)
  66. sp.ExpectSendMessageAndSucceed()
  67. msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
  68. if _, _, err := sp.SendMessage(msg); err != nil {
  69. t.Error("No error expected on first SendMessage call", err)
  70. }
  71. if _, _, err := sp.SendMessage(msg); err != errOutOfExpectations {
  72. t.Error("errOutOfExpectations expected on second SendMessage call, found:", err)
  73. }
  74. if err := sp.Close(); err != nil {
  75. t.Error(err)
  76. }
  77. if len(trm.errors) != 1 {
  78. t.Error("Expected to report an error")
  79. }
  80. }
  81. func TestSyncProducerWithCheckerFunction(t *testing.T) {
  82. trm := newTestReporterMock()
  83. sp := NewSyncProducer(trm, nil)
  84. sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
  85. sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$"))
  86. msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
  87. if _, _, err := sp.SendMessage(msg); err != nil {
  88. t.Error("No error expected on first SendMessage call, found: ", err)
  89. }
  90. msg = &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
  91. if _, _, err := sp.SendMessage(msg); err == nil || !strings.HasPrefix(err.Error(), "No match") {
  92. t.Error("Error during value check expected on second SendMessage call, found:", err)
  93. }
  94. if err := sp.Close(); err != nil {
  95. t.Error(err)
  96. }
  97. if len(trm.errors) != 1 {
  98. t.Error("Expected to report an error")
  99. }
  100. }
  101. func TestSyncProducerWithCheckerFunctionForSendMessagesWithError(t *testing.T) {
  102. trm := newTestReporterMock()
  103. sp := NewSyncProducer(trm, nil)
  104. sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
  105. sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$"))
  106. msg1 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
  107. msg2 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
  108. msgs := []*sarama.ProducerMessage{msg1, msg2}
  109. if err := sp.SendMessages(msgs); err == nil || !strings.HasPrefix(err.Error(), "No match") {
  110. t.Error("Error during value check expected on second message, found: ", err)
  111. }
  112. if err := sp.Close(); err != nil {
  113. t.Error(err)
  114. }
  115. if len(trm.errors) != 1 {
  116. t.Error("Expected to report an error")
  117. }
  118. }
  119. func TestSyncProducerWithCheckerFunctionForSendMessagesWithoutError(t *testing.T) {
  120. trm := newTestReporterMock()
  121. sp := NewSyncProducer(trm, nil)
  122. sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
  123. msg1 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
  124. msgs := []*sarama.ProducerMessage{msg1}
  125. if err := sp.SendMessages(msgs); err != nil {
  126. t.Error("No error expected on SendMessages call, found: ", err)
  127. }
  128. if err := sp.Close(); err != nil {
  129. t.Error(err)
  130. }
  131. if len(trm.errors) != 0 {
  132. t.Errorf("Expected to not report any errors, found: %v", trm.errors)
  133. }
  134. }
  135. func TestSyncProducerSendMessagesExpectationsMismatchTooFew(t *testing.T) {
  136. trm := newTestReporterMock()
  137. sp := NewSyncProducer(trm, nil)
  138. sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
  139. msg1 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
  140. msg2 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
  141. msgs := []*sarama.ProducerMessage{msg1, msg2}
  142. if err := sp.SendMessages(msgs); err == nil {
  143. t.Error("Error during value check expected on second message, found: ", err)
  144. }
  145. if err := sp.Close(); err != nil {
  146. t.Error(err)
  147. }
  148. if len(trm.errors) != 2 {
  149. t.Error("Expected to report 2 errors")
  150. }
  151. }
  152. func TestSyncProducerSendMessagesExpectationsMismatchTooMany(t *testing.T) {
  153. trm := newTestReporterMock()
  154. sp := NewSyncProducer(trm, nil)
  155. sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
  156. sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
  157. msg1 := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
  158. msgs := []*sarama.ProducerMessage{msg1}
  159. if err := sp.SendMessages(msgs); err != nil {
  160. t.Error("No error expected on SendMessages call, found: ", err)
  161. }
  162. if err := sp.Close(); err != nil {
  163. t.Error(err)
  164. }
  165. if len(trm.errors) != 1 {
  166. t.Error("Expected to report 1 errors")
  167. }
  168. }
  169. func TestSyncProducerSendMessagesFaultyEncoder(t *testing.T) {
  170. trm := newTestReporterMock()
  171. sp := NewSyncProducer(trm, nil)
  172. sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
  173. msg1 := &sarama.ProducerMessage{Topic: "test", Value: faultyEncoder("123")}
  174. msgs := []*sarama.ProducerMessage{msg1}
  175. if err := sp.SendMessages(msgs); err == nil || !strings.HasPrefix(err.Error(), "encode error") {
  176. t.Error("Encoding error expected, found: ", err)
  177. }
  178. if err := sp.Close(); err != nil {
  179. t.Error(err)
  180. }
  181. if len(trm.errors) != 1 {
  182. t.Error("Expected to report 1 errors")
  183. }
  184. }
  185. type faultyEncoder []byte
  186. func (f faultyEncoder) Encode() ([]byte, error) {
  187. return nil, errors.New("encode error")
  188. }
  189. func (f faultyEncoder) Length() int {
  190. return len(f)
  191. }