Browse Source

Unexport mockbroker.

Willem van Bergen 10 năm trước cách đây
mục cha
commit
91a706483c
5 tập tin đã thay đổi với 61 bổ sung61 xóa
  1. 1 1
      broker_test.go
  2. 9 9
      client_test.go
  3. 9 9
      consumer_test.go
  4. 19 19
      mockbroker.go
  5. 23 23
      producer_test.go

+ 1 - 1
broker_test.go

@@ -51,7 +51,7 @@ func TestBrokerAccessors(t *testing.T) {
 }
 
 func TestSimpleBrokerCommunication(t *testing.T) {
-	mb := NewMockBroker(t, 0)
+	mb := newMockBroker(t, 0)
 	defer mb.Close()
 
 	broker := NewBroker(mb.Addr())

+ 9 - 9
client_test.go

@@ -20,7 +20,7 @@ func TestDefaultClientConfigValidates(t *testing.T) {
 }
 
 func TestSimpleClient(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
+	seedBroker := newMockBroker(t, 1)
 
 	seedBroker.Returns(new(MetadataResponse))
 
@@ -34,8 +34,8 @@ func TestSimpleClient(t *testing.T) {
 }
 
 func TestCachedPartitions(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader := NewMockBroker(t, 5)
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 5)
 
 	replicas := []int32{3, 1, 5}
 	isr := []int32{5, 1}
@@ -74,8 +74,8 @@ func TestCachedPartitions(t *testing.T) {
 }
 
 func TestClientSeedBrokers(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	discoveredBroker := NewMockBroker(t, 2)
+	seedBroker := newMockBroker(t, 1)
+	discoveredBroker := newMockBroker(t, 2)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(discoveredBroker.Addr(), discoveredBroker.BrokerID())
@@ -92,8 +92,8 @@ func TestClientSeedBrokers(t *testing.T) {
 }
 
 func TestClientMetadata(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader := NewMockBroker(t, 5)
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 5)
 
 	replicas := []int32{3, 1, 5}
 	isr := []int32{5, 1}
@@ -165,8 +165,8 @@ func TestClientMetadata(t *testing.T) {
 }
 
 func TestClientRefreshBehaviour(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader := NewMockBroker(t, 5)
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 5)
 
 	metadataResponse1 := new(MetadataResponse)
 	metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())

+ 9 - 9
consumer_test.go

@@ -22,8 +22,8 @@ func TestDefaultPartitionConsumerConfigValidates(t *testing.T) {
 }
 
 func TestConsumerOffsetManual(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader := NewMockBroker(t, 2)
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
@@ -73,8 +73,8 @@ func TestConsumerOffsetManual(t *testing.T) {
 }
 
 func TestConsumerLatestOffset(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader := NewMockBroker(t, 2)
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
@@ -121,8 +121,8 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 	// for topics that are compressed and/or compacted (different things!) we have to be
 	// able to handle receiving offsets that are non-sequential (though still strictly increasing) and
 	// possibly starting prior to the actual value we requested
-	seedBroker := NewMockBroker(t, 1)
-	leader := NewMockBroker(t, 2)
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
@@ -166,9 +166,9 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 
 func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	// initial setup
-	seedBroker := NewMockBroker(t, 1)
-	leader0 := NewMockBroker(t, 2)
-	leader1 := NewMockBroker(t, 3)
+	seedBroker := newMockBroker(t, 1)
+	leader0 := newMockBroker(t, 2)
+	leader1 := newMockBroker(t, 3)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())

+ 19 - 19
mockbroker.go

@@ -9,15 +9,15 @@ import (
 	"time"
 )
 
-// TestState is a generic interface for a test state, implemented e.g. by testing.T
-type TestState interface {
+// testState is a generic interface for a test state, implemented e.g. by testing.T
+type testState interface {
 	Error(args ...interface{})
 	Fatal(args ...interface{})
 	Errorf(format string, args ...interface{})
 	Fatalf(format string, args ...interface{})
 }
 
-// MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
+// mockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
 // accepts a single connection. It reads Kafka requests from that connection and returns each response
 // from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
 // the server sleeps for 250ms instead of reading a request).
@@ -27,33 +27,33 @@ type TestState interface {
 //
 // It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
 // automatically as a convenience.
-type MockBroker struct {
+type mockBroker struct {
 	brokerID     int32
 	port         int32
 	stopper      chan bool
 	expectations chan encoder
 	listener     net.Listener
-	t            TestState
+	t            testState
 	latency      time.Duration
 }
 
-func (b *MockBroker) SetLatency(latency time.Duration) {
+func (b *mockBroker) SetLatency(latency time.Duration) {
 	b.latency = latency
 }
 
-func (b *MockBroker) BrokerID() int32 {
+func (b *mockBroker) BrokerID() int32 {
 	return b.brokerID
 }
 
-func (b *MockBroker) Port() int32 {
+func (b *mockBroker) Port() int32 {
 	return b.port
 }
 
-func (b *MockBroker) Addr() string {
+func (b *mockBroker) Addr() string {
 	return b.listener.Addr().String()
 }
 
-func (b *MockBroker) Close() {
+func (b *mockBroker) Close() {
 	if len(b.expectations) > 0 {
 		b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations))
 	}
@@ -61,7 +61,7 @@ func (b *MockBroker) Close() {
 	<-b.stopper
 }
 
-func (b *MockBroker) serverLoop() (ok bool) {
+func (b *mockBroker) serverLoop() (ok bool) {
 	var (
 		err  error
 		conn net.Conn
@@ -117,7 +117,7 @@ func (b *MockBroker) serverLoop() (ok bool) {
 	return true
 }
 
-func (b *MockBroker) serverError(err error, conn net.Conn) bool {
+func (b *mockBroker) serverError(err error, conn net.Conn) bool {
 	b.t.Error(err)
 	if conn != nil {
 		if err := conn.Close(); err != nil {
@@ -130,19 +130,19 @@ func (b *MockBroker) serverError(err error, conn net.Conn) bool {
 	return false
 }
 
-// NewMockBroker launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the
+// newMockBroker launches a fake Kafka broker. It takes a TestState (e.g. *testing.T) as provided by the
 // test framework and a channel of responses to use.  If an error occurs it is
 // simply logged to the TestState and the broker exits.
-func NewMockBroker(t TestState, brokerID int32) *MockBroker {
-	return NewMockBrokerAddr(t, brokerID, "localhost:0")
+func newMockBroker(t testState, brokerID int32) *mockBroker {
+	return newMockBrokerAddr(t, brokerID, "localhost:0")
 }
 
-// NewMockBrokerAddr behaves like NewMockBroker but listens on the address you give
+// newMockBrokerAddr behaves like newMockBroker but listens on the address you give
 // it rather than just some ephemeral port.
-func NewMockBrokerAddr(t TestState, brokerID int32, addr string) *MockBroker {
+func newMockBrokerAddr(t testState, brokerID int32, addr string) *mockBroker {
 	var err error
 
-	broker := &MockBroker{
+	broker := &mockBroker{
 		stopper:      make(chan bool),
 		t:            t,
 		brokerID:     brokerID,
@@ -168,6 +168,6 @@ func NewMockBrokerAddr(t TestState, brokerID int32, addr string) *MockBroker {
 	return broker
 }
 
-func (b *MockBroker) Returns(e encoder) {
+func (b *mockBroker) Returns(e encoder) {
 	b.expectations <- e
 }

+ 23 - 23
producer_test.go

@@ -36,8 +36,8 @@ func TestDefaultProducerConfigValidates(t *testing.T) {
 }
 
 func TestSimpleProducer(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader := NewMockBroker(t, 2)
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
@@ -74,8 +74,8 @@ func TestSimpleProducer(t *testing.T) {
 }
 
 func TestConcurrentSimpleProducer(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader := NewMockBroker(t, 2)
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
@@ -119,8 +119,8 @@ func TestConcurrentSimpleProducer(t *testing.T) {
 }
 
 func TestProducer(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader := NewMockBroker(t, 2)
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
@@ -171,8 +171,8 @@ func TestProducer(t *testing.T) {
 }
 
 func TestProducerMultipleFlushes(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader := NewMockBroker(t, 2)
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
@@ -224,9 +224,9 @@ func TestProducerMultipleFlushes(t *testing.T) {
 }
 
 func TestProducerMultipleBrokers(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader0 := NewMockBroker(t, 2)
-	leader1 := NewMockBroker(t, 3)
+	seedBroker := newMockBroker(t, 1)
+	leader0 := newMockBroker(t, 2)
+	leader1 := newMockBroker(t, 3)
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
@@ -282,9 +282,9 @@ func TestProducerMultipleBrokers(t *testing.T) {
 }
 
 func TestProducerFailureRetry(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader1 := NewMockBroker(t, 2)
-	leader2 := NewMockBroker(t, 3)
+	seedBroker := newMockBroker(t, 1)
+	leader1 := newMockBroker(t, 2)
+	leader2 := newMockBroker(t, 3)
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
@@ -360,8 +360,8 @@ func TestProducerFailureRetry(t *testing.T) {
 }
 
 func TestProducerBrokerBounce(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader := NewMockBroker(t, 2)
+	seedBroker := newMockBroker(t, 1)
+	leader := newMockBroker(t, 2)
 	leaderAddr := leader.Addr()
 
 	metadataResponse := new(MetadataResponse)
@@ -387,7 +387,7 @@ func TestProducerBrokerBounce(t *testing.T) {
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
 	leader.Close()                               // producer should get EOF
-	leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
+	leader = newMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
 	seedBroker.Returns(metadataResponse)         // tell it to go to broker 2 again
 
 	prodSuccess := new(ProduceResponse)
@@ -414,9 +414,9 @@ func TestProducerBrokerBounce(t *testing.T) {
 }
 
 func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader1 := NewMockBroker(t, 2)
-	leader2 := NewMockBroker(t, 3)
+	seedBroker := newMockBroker(t, 1)
+	leader1 := newMockBroker(t, 2)
+	leader2 := newMockBroker(t, 3)
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
@@ -475,9 +475,9 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 }
 
 func TestProducerMultipleRetries(t *testing.T) {
-	seedBroker := NewMockBroker(t, 1)
-	leader1 := NewMockBroker(t, 2)
-	leader2 := NewMockBroker(t, 3)
+	seedBroker := newMockBroker(t, 1)
+	leader1 := newMockBroker(t, 2)
+	leader2 := newMockBroker(t, 3)
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())