|
|
@@ -21,16 +21,25 @@ const (
|
|
|
|
|
|
type requestHandlerFunc func(req *request) (res encoder)
|
|
|
|
|
|
-// mockBroker is a mock Kafka broker. It consists of a TCP server on a
|
|
|
-// kernel-selected localhost port that can accept many connections. It reads
|
|
|
-// Kafka requests from that connection and passes them to the user specified
|
|
|
-// handler function (see SetHandler) that generates respective responses. If
|
|
|
-// the handler has not been explicitly specified then the broker returns
|
|
|
-// responses set by the Returns function in the exact order they were provided.
|
|
|
-// (if a response has a len of 0, nothing is sent, and the client request will
|
|
|
-// timeout in this case).
|
|
|
+// MockBroker is a mock Kafka broker that is used in unit tests. It is exposed
|
|
|
+// to facilitate testing of higher level or specialized consumers and producers
|
|
|
+// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
|
|
|
+// but rather provides a facility to do that. It takes care of the TCP
|
|
|
+// transport, request unmarshaling, response marshaling, and makes it the test
|
|
|
+// writer responsibility to program correct according to the Kafka API protocol
|
|
|
+// MockBroker behaviour.
|
|
|
//
|
|
|
-// When running tests with one of these, it is strongly recommended to specify
|
|
|
+// MockBroker is implemented as a TCP server listening on a kernel-selected
|
|
|
+// localhost port that can accept many connections. It reads Kafka requests
|
|
|
+// from that connection and returns responses programmed by the SetHandlerByMap
|
|
|
+// function. If a MockBroker receives a request that it has no programmed
|
|
|
+// response for, then it returns nothing and the request times out.
|
|
|
+//
|
|
|
+// A set of MockRequest builders to define mappings used by MockBroker is
|
|
|
+// provided by Sarama. But users can develop MockRequests of their own and use
|
|
|
+// them along with or instead of the standard ones.
|
|
|
+//
|
|
|
+// When running tests with MockBroker it is strongly recommended to specify
|
|
|
// a timeout to `go test` so that if the broker hangs waiting for a response,
|
|
|
// the test panics.
|
|
|
//
|
|
|
@@ -50,15 +59,22 @@ type MockBroker struct {
|
|
|
lock sync.Mutex
|
|
|
}
|
|
|
|
|
|
+// RequestResponse represents a Request/Response pair processed by MockBroker.
|
|
|
type RequestResponse struct {
|
|
|
Request requestBody
|
|
|
Response encoder
|
|
|
}
|
|
|
|
|
|
+// SetLatency makes broker pause for the specified period every time before
|
|
|
+// replying.
|
|
|
func (b *MockBroker) SetLatency(latency time.Duration) {
|
|
|
b.latency = latency
|
|
|
}
|
|
|
|
|
|
+// SetHandlerByMap defines mapping of Request types to MockResponses. When a
|
|
|
+// request is received by the broker, it looks up the request type in the map
|
|
|
+// and uses the found MockResponse instance to generate an appropriate reply.
|
|
|
+// If the request type is not found in the map then nothing is sent.
|
|
|
func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
|
|
|
b.setHandler(func(req *request) (res encoder) {
|
|
|
reqTypeName := reflect.TypeOf(req.body).Elem().Name()
|
|
|
@@ -70,10 +86,15 @@ func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+// BrokerID returns broker ID assigned to the broker.
|
|
|
func (b *MockBroker) BrokerID() int32 {
|
|
|
return b.brokerID
|
|
|
}
|
|
|
|
|
|
+// History returns a slice of RequestResponse pairs in the order they were
|
|
|
+// processed by the broker. Note that in case of multiple connections to the
|
|
|
+// broker the order expected by a test can be different from the order recorded
|
|
|
+// in the history, unless some synchronization is implemented in the test.
|
|
|
func (b *MockBroker) History() []RequestResponse {
|
|
|
b.lock.Lock()
|
|
|
history := make([]RequestResponse, len(b.history))
|
|
|
@@ -82,14 +103,18 @@ func (b *MockBroker) History() []RequestResponse {
|
|
|
return history
|
|
|
}
|
|
|
|
|
|
+// Port returns the TCP port number the broker is listening for requests on.
|
|
|
func (b *MockBroker) Port() int32 {
|
|
|
return b.port
|
|
|
}
|
|
|
|
|
|
+// Addr returns the broker connection string in the form "<address>:<port>".
|
|
|
func (b *MockBroker) Addr() string {
|
|
|
return b.listener.Addr().String()
|
|
|
}
|
|
|
|
|
|
+// Close terminates the broker blocking until it stops internal goroutines and
|
|
|
+// releases all resources.
|
|
|
func (b *MockBroker) Close() {
|
|
|
close(b.expectations)
|
|
|
if len(b.expectations) > 0 {
|