|
|
@@ -6,9 +6,15 @@ import (
|
|
|
"io"
|
|
|
"net"
|
|
|
"strconv"
|
|
|
- "testing"
|
|
|
)
|
|
|
|
|
|
+// TestState is a generic interface for a test state, implemented e.g. by testing.T
|
|
|
+type TestState interface {
|
|
|
+ Error(args ...interface{})
|
|
|
+ Fatal(args ...interface{})
|
|
|
+ Fatalf(format string, args ...interface{})
|
|
|
+}
|
|
|
+
|
|
|
// MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
|
|
|
// accepts a single connection. It reads Kafka requests from that connection and returns each response
|
|
|
// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
|
|
|
@@ -25,7 +31,7 @@ type MockBroker struct {
|
|
|
stopper chan bool
|
|
|
expectations chan encoder
|
|
|
listener net.Listener
|
|
|
- t *testing.T
|
|
|
+ t TestState
|
|
|
expecting encoder
|
|
|
}
|
|
|
|
|
|
@@ -118,10 +124,10 @@ func (b *MockBroker) serverError(err error, conn net.Conn) bool {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
-// New launches a fake Kafka broker. It takes a testing.T as provided by the
|
|
|
+// New launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the
|
|
|
// test framework and a channel of responses to use. If an error occurs it is
|
|
|
-// simply logged to the testing.T and the broker exits.
|
|
|
-func NewMockBroker(t *testing.T, brokerID int) *MockBroker {
|
|
|
+// simply logged to the TestState and the broker exits.
|
|
|
+func NewMockBroker(t TestState, brokerID int) *MockBroker {
|
|
|
var err error
|
|
|
|
|
|
broker := &MockBroker{
|