浏览代码

Address comments

Jacopo Silvestro 9 年之前
父节点
当前提交
7aeedb0f4a
共有 9 个文件被更改,包括 92 次插入86 次删除
  1. 0 4
      async_producer_test.go
  2. 0 3
      errors.go
  3. 0 4
      examples/http_server/http_server.go
  4. 28 29
      mocks/async_producer.go
  5. 13 3
      mocks/async_producer_test.go
  6. 19 3
      mocks/mocks.go
  7. 25 25
      mocks/sync_producer.go
  8. 7 6
      mocks/sync_producer_test.go
  9. 0 9
      utils.go

+ 0 - 4
async_producer_test.go

@@ -93,10 +93,6 @@ func (f flakyEncoder) Encode() ([]byte, error) {
 	return []byte(TestMessage), nil
 }
 
-func (f flakyEncoder) String() string {
-	return string(TestMessage)
-}
-
 func TestAsyncProducer(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	leader := NewMockBroker(t, 2)

+ 0 - 3
errors.go

@@ -37,9 +37,6 @@ var ErrShuttingDown = errors.New("kafka: message received by producer in process
 // ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
 var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max")
 
-// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max
-var ErrRegexpMatch = errors.New("kafka: message is larger than Consumer.Fetch.Max")
-
 // PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
 // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
 type PacketEncodingError struct {

+ 0 - 4
examples/http_server/http_server.go

@@ -162,10 +162,6 @@ func (ale *accessLogEntry) Encode() ([]byte, error) {
 	return ale.encoded, ale.err
 }
 
-func (ale *accessLogEntry) String() string {
-	return fmt.Sprintf("%+v\n", ale)
-}
-
 func (s *Server) withAccessLog(next http.Handler) http.Handler {
 
 	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

+ 28 - 29
mocks/async_producer.go

@@ -1,7 +1,6 @@
 package mocks
 
 import (
-	"regexp"
 	"sync"
 
 	"github.com/Shopify/sarama"
@@ -11,8 +10,8 @@ import (
 // Before you can send messages to it's Input channel, you have to set expectations
 // so it knows how to handle the input; it returns and error if the numer of messages
 // received is bigger then the number of expectations set. You can also set a
-// regexp in each expectation so that the message value is matched against this
-// regexp and an error is returned if the match fails.
+// function in each expectation so that the message value is checked by this function
+// and an error is returned if the match fails.
 type AsyncProducer struct {
 	l            sync.Mutex
 	t            ErrorReporter
@@ -55,16 +54,16 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer {
 			} else {
 				expectation := mp.expectations[0]
 				mp.expectations = mp.expectations[1:]
-				if len(expectation.MatchPattern) != 0 {
-					matched, err := regexp.MatchString(expectation.MatchPattern, msg.Value.String())
-					if err != nil {
-						mp.t.Errorf("Error while trying to match the input message with the expected pattern: " + err.Error())
-						mp.l.Unlock()
-						panic(err.Error())
-					}
-					if !matched {
-						mp.t.Errorf("Input value \"%s\" did not match expected pattern \"%s\"", msg.Value.String(), expectation.MatchPattern)
-						mp.errors <- &sarama.ProducerError{Err: errNoMatch, Msg: msg}
+				if expectation.CheckFunction != nil {
+					if val, err := msg.Value.Encode(); err != nil {
+						mp.t.Errorf("Input message encoding failed: %s", err.Error())
+						mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
+					} else {
+						err = expectation.CheckFunction(val)
+						if err != nil {
+							mp.t.Errorf("Check function returned an error: %s", err.Error())
+							mp.errors <- &sarama.ProducerError{Err: err, Msg: msg}
+						}
 					}
 				}
 				if expectation.Result == errProduceSuccess {
@@ -137,26 +136,26 @@ func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError {
 // Setting expectations
 ////////////////////////////////////////////////
 
-// ExpectInputWithPatternAndSucceed sets an expectation on the mock producer that a message
-// with value matching a given regexp will be provided on the input channel. The mock producer
-// will first check the message value against the pattern. It will return an error if the matching
-// fails or handle the message as if it produced successfully, i.e. it will make it available
-// on the Successes channel if the Producer.Return.Successes setting is set to true.
-func (mp *AsyncProducer) ExpectInputWithPatternAndSucceed(pattern string) {
+// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message
+// will be provided on the input channel. The mock producer will call the given function to check
+// the message value. If an error is returned it will be made available on the Errors channel
+// otherwise the mock will handle the message as if it produced successfully, i.e. it will make
+// it available on the Successes channel if the Producer.Return.Successes setting is set to true.
+func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) {
 	mp.l.Lock()
 	defer mp.l.Unlock()
-	mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, MatchPattern: pattern})
+	mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
 }
 
-// ExpectInputWithPatternAndFail sets an expectation on the mock producer that a message
-// with value matching a given regexp will be provided on the input channel. The mock producer
-// will first check the message value against the pattern. It will return an error if the matching
-// fails or handle the message as if it failed to produce successfully. This means it will make
-// a ProducerError available on the Errors channel.
-func (mp *AsyncProducer) ExpectInputWithPatternAndFail(pattern string, err error) {
+// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message
+// will be provided on the input channel. The mock producer will first call the given function to
+// check the message value. If an error is returned it will be made available on the Errors channel
+// otherwise the mock will handle the message as if it failed to produce successfully. This means
+// it will make a ProducerError available on the Errors channel.
+func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) {
 	mp.l.Lock()
 	defer mp.l.Unlock()
-	mp.expectations = append(mp.expectations, &producerExpectation{Result: err, MatchPattern: pattern})
+	mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
 }
 
 // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided
@@ -164,12 +163,12 @@ func (mp *AsyncProducer) ExpectInputWithPatternAndFail(pattern string, err error
 // i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting
 // is set to true.
 func (mp *AsyncProducer) ExpectInputAndSucceed() {
-	mp.ExpectInputWithPatternAndSucceed("")
+	mp.ExpectInputWithCheckerFunctionAndSucceed(nil)
 }
 
 // ExpectInputAndFail sets an expectation on the mock producer that a message will be provided
 // on the input channel. The mock producer will handle the message as if it failed to produce
 // successfully. This means it will make a ProducerError available on the Errors channel.
 func (mp *AsyncProducer) ExpectInputAndFail(err error) {
-	mp.ExpectInputWithPatternAndFail("", err)
+	mp.ExpectInputWithCheckerFunctionAndFail(nil, err)
 }

+ 13 - 3
mocks/async_producer_test.go

@@ -2,6 +2,7 @@ package mocks
 
 import (
 	"fmt"
+	"strings"
 	"testing"
 
 	"github.com/Shopify/sarama"
@@ -93,15 +94,24 @@ func TestProducerWithTooManyExpectations(t *testing.T) {
 	}
 }
 
-func TestProducerWithMatchPattern(t *testing.T) {
+func TestProducerWithCheckerFunction(t *testing.T) {
 	trm := newTestReporterMock()
 	mp := NewAsyncProducer(trm, nil)
-	mp.ExpectInputWithPatternAndSucceed("$tes")
-	mp.ExpectInputWithPatternAndFail("tes$", errNoMatch)
+	mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
+	mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$"))
 
 	mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
 	mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
 	if err := mp.Close(); err != nil {
 		t.Error(err)
 	}
+
+	if len(mp.Errors()) != 1 {
+		t.Error("Expected to report an error")
+	}
+
+	err1 := <-mp.Errors()
+	if !strings.HasPrefix(err1.Err.Error(), "No match") {
+		t.Error("Expected to report a value check error, found: ", err1.Err)
+	}
 }

+ 19 - 3
mocks/mocks.go

@@ -15,6 +15,8 @@ package mocks
 
 import (
 	"errors"
+	"fmt"
+	"regexp"
 
 	"github.com/Shopify/sarama"
 )
@@ -25,18 +27,32 @@ type ErrorReporter interface {
 	Errorf(string, ...interface{})
 }
 
+type ValueChecker func(val []byte) error
+
+func generateRegexpChecker(re string) func([]byte) error {
+	return func(val []byte) error {
+		matched, err := regexp.MatchString(re, string(val))
+		if err != nil {
+			return errors.New("Error while trying to match the input message with the expected pattern: " + err.Error())
+		}
+		if !matched {
+			return fmt.Errorf("No match between input value \"%s\" and expected pattern \"%s\"", val, re)
+		}
+		return nil
+	}
+}
+
 var (
 	errProduceSuccess              error = nil
 	errOutOfExpectations                 = errors.New("No more expectations set on mock")
 	errPartitionConsumerNotStarted       = errors.New("The partition consumer was never started")
-	errNoMatch                           = errors.New("The input message value did not match with the expected pattern")
 )
 
 const AnyOffset int64 = -1000
 
 type producerExpectation struct {
-	Result       error
-	MatchPattern string
+	Result        error
+	CheckFunction ValueChecker
 }
 
 type consumerExpectation struct {

+ 25 - 25
mocks/sync_producer.go

@@ -1,7 +1,6 @@
 package mocks
 
 import (
-	"regexp"
 	"sync"
 
 	"github.com/Shopify/sarama"
@@ -35,8 +34,8 @@ func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer {
 
 // SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation.
 // You have to set expectations on the mock producer before calling SendMessage, so it knows
-// how to handle them. You can set a regexp in each expectation so that the message value
-// is matched against this regexp and an error is returned if the match fails.
+// how to handle them. You can set a function in each expectation so that the message value
+// checked by this function and an error is returned if the match fails.
 // If there is no more remaining expectation when SendMessage is called,
 // the mock producer will write an error to the test state object.
 func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
@@ -46,15 +45,16 @@ func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int3
 	if len(sp.expectations) > 0 {
 		expectation := sp.expectations[0]
 		sp.expectations = sp.expectations[1:]
-		if len(expectation.MatchPattern) != 0 {
-			matched, err := regexp.MatchString(expectation.MatchPattern, msg.Value.String())
-			if err != nil {
-				sp.t.Errorf("Error while trying to match the input message with the expected pattern: " + err.Error())
-				panic(err.Error())
-			}
-			if !matched {
-				sp.t.Errorf("Input value \"%s\" did not match expected pattern \"%s\"", msg.Value.String(), expectation.MatchPattern)
-				return -1, -1, errNoMatch
+		if expectation.CheckFunction != nil {
+			if val, err := msg.Value.Encode(); err != nil {
+				sp.t.Errorf("Input message encoding failed: %s", err.Error())
+				return -1, -1, err
+			} else {
+				err := expectation.CheckFunction(val)
+				if err != nil {
+					sp.t.Errorf("Check function returned an error: %s", err.Error())
+					return -1, -1, err
+				}
 			}
 		}
 		if expectation.Result == errProduceSuccess {
@@ -88,36 +88,36 @@ func (sp *SyncProducer) Close() error {
 // Setting expectations
 ////////////////////////////////////////////////
 
-// ExpectSendMessageWithPatternAndSucceed sets an expectation on the mock producer that SendMessage
-// will be called with a message value matching a given regexp. The mock producer will first check the
-// message value against the pattern. It will return an error if the matching fails or handle
-// the message as if it produced successfully, i.e. by returning a valid partition, and offset, and a nil error.
-func (sp *SyncProducer) ExpectSendMessageWithPatternAndSucceed(pattern string) {
+// ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage
+// will be called. The mock producer will first call the given function to check the message value.
+// It will cascade the error of the function, if any, or handle the message as if it produced
+// successfully, i.e. by returning a valid partition, and offset, and a nil error.
+func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) {
 	sp.l.Lock()
 	defer sp.l.Unlock()
-	sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, MatchPattern: pattern})
+	sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf})
 }
 
 // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
-// called with a message value matching a given regexp. The mock producer will first check the
-// message value against the pattern. It will return an error if the matching fails or handle
-// the message as if it failed to produce successfully, i.e. by returning the provided error.
-func (sp *SyncProducer) ExpectSendMessageWithPatternAndFail(pattern string, err error) {
+// called. The mock producer will first call the given function to check the message value.
+// It will cascade the error of the function, if any, or handle the message as if it failed
+// to produce successfully, i.e. by returning the provided error.
+func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) {
 	sp.l.Lock()
 	defer sp.l.Unlock()
-	sp.expectations = append(sp.expectations, &producerExpectation{Result: err, MatchPattern: pattern})
+	sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf})
 }
 
 // ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be
 // called. The mock producer will handle the message as if it produced successfully, i.e. by
 // returning a valid partition, and offset, and a nil error.
 func (sp *SyncProducer) ExpectSendMessageAndSucceed() {
-	sp.ExpectSendMessageWithPatternAndSucceed("")
+	sp.ExpectSendMessageWithCheckerFunctionAndSucceed(nil)
 }
 
 // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be
 // called. The mock producer will handle the message as if it failed to produce
 // successfully, i.e. by returning the provided error.
 func (sp *SyncProducer) ExpectSendMessageAndFail(err error) {
-	sp.ExpectSendMessageWithPatternAndFail("", err)
+	sp.ExpectSendMessageWithCheckerFunctionAndFail(nil, err)
 }

+ 7 - 6
mocks/sync_producer_test.go

@@ -1,6 +1,7 @@
 package mocks
 
 import (
+	"strings"
 	"testing"
 
 	"github.com/Shopify/sarama"
@@ -97,20 +98,20 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) {
 	}
 }
 
-func TestSyncProducerWithPattern(t *testing.T) {
+func TestSyncProducerWithCheckerFunction(t *testing.T) {
 	trm := newTestReporterMock()
 
 	sp := NewSyncProducer(trm, nil)
-	sp.ExpectSendMessageWithPatternAndSucceed("^tes")
-	sp.ExpectSendMessageWithPatternAndSucceed("^tes$")
+	sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes"))
+	sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$"))
 
 	msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
 	if _, _, err := sp.SendMessage(msg); err != nil {
-		t.Error("No error expected on first SendMessage call", err)
+		t.Error("No error expected on first SendMessage call, found: ", err)
 	}
 	msg = &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")}
-	if _, _, err := sp.SendMessage(msg); err != errNoMatch {
-		t.Error("errNoMatch expected on second SendMessage call, found:", err)
+	if _, _, err := sp.SendMessage(msg); err == nil || !strings.HasPrefix(err.Error(), "No match") {
+		t.Error("Error during value check expected on second SendMessage call, found:", err)
 	}
 
 	if err := sp.Close(); err != nil {

+ 0 - 9
utils.go

@@ -63,7 +63,6 @@ func safeAsyncClose(b *Broker) {
 type Encoder interface {
 	Encode() ([]byte, error)
 	Length() int
-	String() string
 }
 
 // make strings and byte slices encodable for convenience so they can be used as keys
@@ -81,10 +80,6 @@ func (s StringEncoder) Length() int {
 	return len(s)
 }
 
-func (s StringEncoder) String() string {
-	return string(s)
-}
-
 // ByteEncoder implements the Encoder interface for Go byte slices so that they can be used
 // as the Key or Value in a ProducerMessage.
 type ByteEncoder []byte
@@ -97,10 +92,6 @@ func (b ByteEncoder) Length() int {
 	return len(b)
 }
 
-func (s ByteEncoder) String() string {
-	return string(s)
-}
-
 // bufConn wraps a net.Conn with a buffer for reads to reduce the number of
 // reads that trigger syscalls.
 type bufConn struct {