Ver código fonte

Check the received value in producers (synch/asych) mocks agains a regexp

Jacopo Silvestro 9 anos atrás
pai
commit
94957d3c23

+ 4 - 0
async_producer_test.go

@@ -93,6 +93,10 @@ func (f flakyEncoder) Encode() ([]byte, error) {
 	return []byte(TestMessage), nil
 }
 
+func (f flakyEncoder) String() string {
+	return string(TestMessage)
+}
+
 func TestAsyncProducer(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	leader := NewMockBroker(t, 2)

+ 3 - 0
errors.go

@@ -37,6 +37,9 @@ var ErrShuttingDown = errors.New("kafka: message received by producer in process
 // ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
 var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
 
+// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
+var ErrRegexpMatch = errors.New("kafka: message is larger than Consumer.Fetch.Max")
+
 // PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
 // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
 type PacketEncodingError struct {

+ 4 - 0
examples/http_server/http_server.go

@@ -162,6 +162,10 @@ func (ale *accessLogEntry) Encode() ([]byte, error) {
 	return ale.encoded, ale.err
 }
 
+func (ale *accessLogEntry) String() string {
+	return fmt.Sprintf("%+v\n", ale)
+}
+
 func (s *Server) withAccessLog(next http.Handler) http.Handler {
 
 	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

+ 41 - 8
mocks/async_producer.go

@@ -1,6 +1,7 @@
 package mocks
 
 import (
+	"regexp"
 	"sync"
 
 	"github.com/Shopify/sarama"
@@ -8,8 +9,10 @@ import (
 
 // AsyncProducer implements sarama's Producer interface for testing purposes.
 // Before you can send messages to it's Input channel, you have to set expectations
-// so it knows how to handle the input. This way you can easily test success and
-// failure scenarios.
+// so it knows how to handle the input; it returns and error if the numer of messages
+// received is bigger then the number of expectations set. You can also set a
+// regexp in each expectation so that the message value is matched against this
+// regexp and an error is returned if the match fails.
 type AsyncProducer struct {
 	l            sync.Mutex
 	t            ErrorReporter
@@ -52,6 +55,18 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {
 			} else {
 				expectation := mp.expectations[0]
 				mp.expectations = mp.expectations[1:]
+				if len(expectation.MatchPattern) != 0 {
+					matched, err := regexp.MatchString(expectation.MatchPattern, msg.Value.String())
+					if err != nil {
+						mp.t.Errorf("Error while trying to match the input message with the expected pattern: " + err.Error())
+						mp.l.Unlock()
+						panic(err.Error())
+					}
+					if !matched {
+						mp.t.Errorf("Input value \"%s\" did not match expected pattern \"%s\"", msg.Value.String(), expectation.MatchPattern)
+						mp.errors <- &sarama.ProducerError{Err: errNoMatch, Msg: msg}
+					}
+				}
 				if expectation.Result == errProduceSuccess {
 					mp.lastOffset++
 					if config.Producer.Return.Successes {
@@ -122,21 +137,39 @@ func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError {
 // Setting expectations
 ////////////////////////////////////////////////
 
+// ExpectInputWithPatternAndSucceed sets an expectation on the mock producer that a message
+// with value matching a given regexp will be provided on the input channel. The mock producer
+// will first check the message value against the pattern. It will return an error if the matching
+// fails or handle the message as if it produced successfully, i.e. it will make it available
+// on the Successes channel if the Producer.Return.Successes setting is set to true.
+func (mp *AsyncProducer) ExpectInputWithPatternAndSucceed(pattern string) {
+	mp.l.Lock()
+	defer mp.l.Unlock()
+	mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, MatchPattern: pattern})
+}
+
+// ExpectInputWithPatternAndFail sets an expectation on the mock producer that a message
+// with value matching a given regexp will be provided on the input channel. The mock producer
+// will first check the message value against the pattern. It will return an error if the matching
+// fails or handle the message as if it failed to produce successfully. This means it will make
+// a ProducerError available on the Errors channel.
+func (mp *AsyncProducer) ExpectInputWithPatternAndFail(pattern string, err error) {
+	mp.l.Lock()
+	defer mp.l.Unlock()
+	mp.expectations = append(mp.expectations, &producerExpectation{Result: err, MatchPattern: pattern})
+}
+
 // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
 // on the input channel. The mock producer will handle the message as if it is produced successfully,
 // i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting
 // is set to true.
 func (mp *AsyncProducer) ExpectInputAndSucceed() {
-	mp.l.Lock()
-	defer mp.l.Unlock()
-	mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess})
+	mp.ExpectInputWithPatternAndSucceed("")
 }
 
 // ExpectInputAndFail sets an expectation on the mock producer that a message will be provided
 // on the input channel. The mock producer will handle the message as if it failed to produce
 // successfully. This means it will make a ProducerError available on the Errors channel.
 func (mp *AsyncProducer) ExpectInputAndFail(err error) {
-	mp.l.Lock()
-	defer mp.l.Unlock()
-	mp.expectations = append(mp.expectations, &producerExpectation{Result: err})
+	mp.ExpectInputWithPatternAndFail("", err)
 }

+ 13 - 0
mocks/async_producer_test.go

@@ -92,3 +92,16 @@ func TestProducerWithTooManyExpectations(t *testing.T) {
 		t.Error("Expected to report an error")
 	}
 }
+
+func TestProducerWithMatchPattern(t *testing.T) {
+	trm := newTestReporterMock()
+	mp := NewAsyncProducer(trm, nil)
+	mp.ExpectInputWithPatternAndSucceed("$tes")
+	mp.ExpectInputWithPatternAndFail("tes$", errNoMatch)
+
+	mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
+	mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
+	if err := mp.Close(); err != nil {
+		t.Error(err)
+	}
+}

+ 3 - 1
mocks/mocks.go

@@ -29,12 +29,14 @@ var (
 	errProduceSuccess              error = nil
 	errOutOfExpectations                 = errors.New("No more expectations set on mock")
 	errPartitionConsumerNotStarted       = errors.New("The partition consumer was never started")
+	errNoMatch                           = errors.New("The input message value did not match with the expected pattern")
 )
 
 const AnyOffset int64 = -1000
 
 type producerExpectation struct {
-	Result error
+	Result       error
+	MatchPattern string
 }
 
 type consumerExpectation struct {

+ 37 - 8
mocks/sync_producer.go

@@ -1,6 +1,7 @@
 package mocks
 
 import (
+	"regexp"
 	"sync"
 
 	"github.com/Shopify/sarama"
@@ -34,7 +35,9 @@ func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
 
 // SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation.
 // You have to set expectations on the mock producer before calling SendMessage, so it knows
-// how to handle them. If there is no more remaining expectations when SendMessage is called,
+// how to handle them. You can set a regexp in each expectation so that the message value
+// is matched against this regexp and an error is returned if the match fails.
+// If there is no more remaining expectation when SendMessage is called,
 // the mock producer will write an error to the test state object.
 func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
 	sp.l.Lock()
@@ -43,7 +46,17 @@ func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int3
 	if len(sp.expectations) > 0 {
 		expectation := sp.expectations[0]
 		sp.expectations = sp.expectations[1:]
-
+		if len(expectation.MatchPattern) != 0 {
+			matched, err := regexp.MatchString(expectation.MatchPattern, msg.Value.String())
+			if err != nil {
+				sp.t.Errorf("Error while trying to match the input message with the expected pattern: " + err.Error())
+				panic(err.Error())
+			}
+			if !matched {
+				sp.t.Errorf("Input value \"%s\" did not match expected pattern \"%s\"", msg.Value.String(), expectation.MatchPattern)
+				return -1, -1, errNoMatch
+			}
+		}
 		if expectation.Result == errProduceSuccess {
 			sp.lastOffset++
 			msg.Offset = sp.lastOffset
@@ -75,20 +88,36 @@ func (sp *SyncProducer) Close() error {
 // Setting expectations
 ////////////////////////////////////////////////
 
+// ExpectSendMessageWithPatternAndSucceed sets an expectation on the mock producer that SendMessage
+// will be called with a message value matching a given regexp. The mock producer will first check the
+// message value against the pattern. It will return an error if the matching fails or handle
+// the message as if it produced successfully, i.e. by returning a valid partition, and offset, and a nil error.
+func (sp *SyncProducer) ExpectSendMessageWithPatternAndSucceed(pattern string) {
+	sp.l.Lock()
+	defer sp.l.Unlock()
+	sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, MatchPattern: pattern})
+}
+
+// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
+// called with a message value matching a given regexp. The mock producer will first check the
+// message value against the pattern. It will return an error if the matching fails or handle
+// the message as if it failed to produce successfully, i.e. by returning the provided error.
+func (sp *SyncProducer) ExpectSendMessageWithPatternAndFail(pattern string, err error) {
+	sp.l.Lock()
+	defer sp.l.Unlock()
+	sp.expectations = append(sp.expectations, &producerExpectation{Result: err, MatchPattern: pattern})
+}
+
 // ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be
 // called. The mock producer will handle the message as if it produced successfully, i.e. by
 // returning a valid partition, and offset, and a nil error.
 func (sp *SyncProducer) ExpectSendMessageAndSucceed() {
-	sp.l.Lock()
-	defer sp.l.Unlock()
-	sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess})
+	sp.ExpectSendMessageWithPatternAndSucceed("")
 }
 
 // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
 // called. The mock producer will handle the message as if it failed to produce
 // successfully, i.e. by returning the provided error.
 func (sp *SyncProducer) ExpectSendMessageAndFail(err error) {
-	sp.l.Lock()
-	defer sp.l.Unlock()
-	sp.expectations = append(sp.expectations, &producerExpectation{Result: err})
+	sp.ExpectSendMessageWithPatternAndFail("", err)
 }

+ 25 - 0
mocks/sync_producer_test.go

@@ -96,3 +96,28 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) {
 		t.Error("Expected to report an error")
 	}
 }
+
+func TestSyncProducerWithPattern(t *testing.T) {
+	trm := newTestReporterMock()
+
+	sp := NewSyncProducer(trm, nil)
+	sp.ExpectSendMessageWithPatternAndSucceed("^tes")
+	sp.ExpectSendMessageWithPatternAndSucceed("^tes$")
+
+	msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
+	if _, _, err := sp.SendMessage(msg); err != nil {
+		t.Error("No error expected on first SendMessage call", err)
+	}
+	msg = &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
+	if _, _, err := sp.SendMessage(msg); err != errNoMatch {
+		t.Error("errNoMatch expected on second SendMessage call, found:", err)
+	}
+
+	if err := sp.Close(); err != nil {
+		t.Error(err)
+	}
+
+	if len(trm.errors) != 1 {
+		t.Error("Expected to report an error")
+	}
+}

+ 9 - 0
utils.go

@@ -63,6 +63,7 @@ func safeAsyncClose(b *Broker) {
 type Encoder interface {
 	Encode() ([]byte, error)
 	Length() int
+	String() string
 }
 
 // make strings and byte slices encodable for convenience so they can be used as keys
@@ -80,6 +81,10 @@ func (s StringEncoder) Length() int {
 	return len(s)
 }
 
+func (s StringEncoder) String() string {
+	return string(s)
+}
+
 // ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
 // as the Key or Value in a ProducerMessage.
 type ByteEncoder []byte
@@ -92,6 +97,10 @@ func (b ByteEncoder) Length() int {
 	return len(b)
 }
 
+func (s ByteEncoder) String() string {
+	return string(s)
+}
+
 // bufConn wraps a net.Conn with a buffer for reads to reduce the number of
 // reads that trigger syscalls.
 type bufConn struct {