Burke Libbey 11 years ago
parent
commit
fca2325bf4

+ 4 - 7
broker_test.go

@@ -41,13 +41,11 @@ func TestBrokerAccessors(t *testing.T) {
 	}
 }
 
-/*
 func TestSimpleBrokerCommunication(t *testing.T) {
-	responses := make(chan []byte)
-	mockBroker := mockbroker.New(t, responses)
-	defer mockBroker.Close()
+	mb := NewMockBroker(t, 0)
+	defer mb.Close()
 
-	broker := NewBroker(mockBroker.Addr())
+	broker := NewBroker(mb.Addr())
 	err := broker.Open(4)
 	if err != nil {
 		t.Fatal(err)
@@ -55,7 +53,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 
 	go func() {
 		for _, tt := range brokerTestTable {
-			responses <- tt.response
+			mb.ExpectBytes(tt.response)
 		}
 	}()
 	for _, tt := range brokerTestTable {
@@ -67,7 +65,6 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 		t.Error(err)
 	}
 }
-*/
 
 // We're not testing encoding/decoding here, so most of the requests/responses will be empty for simplicity's sake
 var brokerTestTable = []struct {

+ 110 - 0
client_test.go

@@ -0,0 +1,110 @@
+package sarama
+
+import (
+	"testing"
+)
+
+func TestSimpleClient(t *testing.T) {
+
+	mb := NewMockBroker(t, 1)
+
+	mb.ExpectMetadataRequest()
+
+	client, err := NewClient("client_id", []string{mb.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+	defer mb.Close()
+}
+
+func TestClientExtraBrokers(t *testing.T) {
+
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+	defer mb1.Close()
+	defer mb2.Close()
+}
+
+func TestClientMetadata(t *testing.T) {
+
+	mb1 := NewMockBroker(t, 1)
+	mb5 := NewMockBroker(t, 5)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb5).
+		AddTopicPartition("my_topic", 0, mb5.BrokerID())
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+	defer mb1.Close()
+	defer mb5.Close()
+
+	topics, err := client.Topics()
+	if err != nil {
+		t.Error(err)
+	} else if len(topics) != 1 || topics[0] != "my_topic" {
+		t.Error("Client returned incorrect topics:", topics)
+	}
+
+	parts, err := client.Partitions("my_topic")
+	if err != nil {
+		t.Error(err)
+	} else if len(parts) != 1 || parts[0] != 0 {
+		t.Error("Client returned incorrect partitions for my_topic:", parts)
+	}
+
+	tst, err := client.Leader("my_topic", 0)
+	if err != nil {
+		t.Error(err)
+	} else if tst.ID() != 5 {
+		t.Error("Leader for my_topic had incorrect ID.")
+	}
+}
+
+func TestClientRefreshBehaviour(t *testing.T) {
+	mb1 := NewMockBroker(t, 1)
+	mb5 := NewMockBroker(t, 5)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb5)
+
+	mb5.ExpectMetadataRequest().
+		AddTopicPartition("my_topic", 0xb, 5)
+
+	client, err := NewClient("clientID", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+	defer mb1.Close()
+	defer mb5.Close()
+
+	parts, err := client.Partitions("my_topic")
+	if err != nil {
+		t.Error(err)
+	} else if len(parts) != 1 || parts[0] != 0xb {
+		t.Error("Client returned incorrect partitions for my_topic:", parts)
+	}
+
+	tst, err := client.Leader("my_topic", 0xb)
+	if err != nil {
+		t.Error(err)
+	} else if tst.ID() != 5 {
+		t.Error("Leader for my_topic had incorrect ID.")
+	}
+
+	client.disconnectBroker(tst)
+}

+ 4 - 1
consumer.go

@@ -26,7 +26,8 @@ type ConsumerConfig struct {
 	// treated as no limit.
 	MaxMessageSize int32
 	// The maximum amount of time (in ms) the broker will wait for MinFetchSize bytes to become available before it
-	// returns fewer than that anyways. The default of 0 is treated as no limit.
+	// returns fewer than that anyways. The default of 0 causes Kafka to return immediately, which is rarely desirable
+	// as it causes the Consumer to spin when no events are available. 100-500ms is a reasonable range for most cases.
 	MaxWaitTime int32
 
 	// The method used to determine at which offset to begin consuming messages.
@@ -90,6 +91,8 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 
 	if config.MaxWaitTime < 0 {
 		return nil, ConfigurationError("Invalid MaxWaitTime")
+	} else if config.MaxWaitTime < 100 {
+		Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
 	}
 
 	if config.EventBufferSize < 0 {

+ 169 - 0
consumer_test.go

@@ -0,0 +1,169 @@
+package sarama
+
+import (
+	"fmt"
+	"testing"
+	"time"
+)
+
+var (
+	consumerStopper = []byte{
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00,
+	}
+	extraBrokerMetadata = []byte{
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x09, 'l', 'o', 'c', 'a', 'l', 'h', 'o', 's', 't',
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00,
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c',
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00,
+	}
+)
+
+func TestSimpleConsumer(t *testing.T) {
+
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddTopicPartition("my_topic", 0, 2)
+
+	for i := 0; i < 10; i++ {
+		mb2.ExpectFetchRequest().
+			AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), uint64(i))
+	}
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer consumer.Close()
+	defer mb1.Close()
+	defer mb2.Close()
+
+	for i := 0; i < 10; i++ {
+		event := <-consumer.Events()
+		if event.Err != nil {
+			t.Error(err)
+		}
+		if event.Offset != int64(i) {
+			t.Error("Incorrect message offset!")
+		}
+	}
+
+}
+
+func TestConsumerRawOffset(t *testing.T) {
+
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddTopicPartition("my_topic", 0, 2)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{OffsetMethod: OffsetMethodManual, OffsetValue: 1234})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer consumer.Close()
+
+	defer mb1.Close()
+	defer mb2.Close()
+
+	if consumer.offset != 1234 {
+		t.Error("Raw offset not set correctly")
+	}
+}
+
+func TestConsumerLatestOffset(t *testing.T) {
+
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+
+	mb1.ExpectMetadataRequest().
+		AddBroker(mb2).
+		AddTopicPartition("my_topic", 0, 2)
+
+	mb2.ExpectOffsetFetchRequest().
+		AddTopicPartition("my_topic", 0, 0x010101)
+
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{OffsetMethod: OffsetMethodNewest})
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer consumer.Close()
+
+	defer mb2.Close()
+	defer mb1.Close()
+
+	if consumer.offset != 0x010101 {
+		t.Error("Latest offset not fetched correctly")
+	}
+}
+
+func ExampleConsumer() {
+	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
+	if err != nil {
+		panic(err)
+	} else {
+		fmt.Println("> connected")
+	}
+	defer client.Close()
+
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", nil)
+	if err != nil {
+		panic(err)
+	} else {
+		fmt.Println("> consumer ready")
+	}
+	defer consumer.Close()
+
+	msgCount := 0
+consumerLoop:
+	for {
+		select {
+		case event := <-consumer.Events():
+			if event.Err != nil {
+				panic(event.Err)
+			}
+			msgCount += 1
+		case <-time.After(5 * time.Second):
+			fmt.Println("> timed out")
+			break consumerLoop
+		}
+	}
+	fmt.Println("Got", msgCount, "messages.")
+}

+ 135 - 0
fetch_request_expectation.go

@@ -0,0 +1,135 @@
+package sarama
+
+import (
+	"bytes"
+	"encoding/binary"
+	"hash/crc32"
+)
+
+type FetchRequestExpectation struct {
+	messages []fetchResponseMessage
+}
+
+type encoder interface {
+	Encode() ([]byte, error)
+}
+
+type fetchResponseMessage struct {
+	topic      string
+	partition  int32
+	key, value encoder
+	offset     uint64
+}
+
+func (e *FetchRequestExpectation) AddMessage(
+	topic string, partition int32, key, value encoder, offset uint64,
+) *FetchRequestExpectation {
+	e.messages = append(e.messages, fetchResponseMessage{
+		topic:     topic,
+		partition: partition,
+		key:       key,
+		value:     value,
+		offset:    offset,
+	})
+	return e
+}
+
+func (b *MockBroker) ExpectFetchRequest() *FetchRequestExpectation {
+	e := &FetchRequestExpectation{}
+	b.expectations <- e
+	return e
+}
+
+func (e *FetchRequestExpectation) ResponseBytes() []byte {
+	buf := new(bytes.Buffer)
+
+	byTopic := make(map[string][]fetchResponseMessage)
+	for _, frm := range e.messages {
+		byTopic[frm.topic] = append(byTopic[frm.topic], frm)
+	}
+
+	binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
+	for topic, messages := range byTopic {
+		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
+		buf.Write([]byte(topic))
+
+		byPartition := make(map[int32][]fetchResponseMessage)
+		for _, frm := range messages {
+			byPartition[frm.partition] = append(byPartition[frm.partition], frm)
+		}
+
+		binary.Write(buf, binary.BigEndian, uint32(len(byPartition)))
+
+		for partition, messages := range byPartition {
+			binary.Write(buf, binary.BigEndian, uint32(partition))
+			binary.Write(buf, binary.BigEndian, uint16(0)) // error
+			binary.Write(buf, binary.BigEndian, uint64(0)) // high water mark offset
+
+			messageSetBuffer := new(bytes.Buffer)
+
+			var maxOffset uint64
+
+			for _, msg := range messages {
+				chunk := new(bytes.Buffer)
+
+				binary.Write(chunk, binary.BigEndian, uint8(0)) // format
+				binary.Write(chunk, binary.BigEndian, uint8(0)) // attribute
+
+				if msg.offset > maxOffset {
+					maxOffset = msg.offset
+				}
+
+				if msg.key == nil {
+					binary.Write(chunk, binary.BigEndian, int32(-1))
+				} else {
+					bytes, _ := msg.key.Encode()
+					binary.Write(chunk, binary.BigEndian, int32(len(bytes)))
+					chunk.Write(bytes)
+				}
+
+				if msg.value == nil {
+					binary.Write(chunk, binary.BigEndian, int32(-1))
+				} else {
+					bytes, _ := msg.value.Encode()
+					binary.Write(chunk, binary.BigEndian, int32(len(bytes)))
+					chunk.Write(bytes)
+				}
+
+				cksum := crc32.ChecksumIEEE(chunk.Bytes())
+				length := len(chunk.Bytes()) + 4
+
+				binary.Write(messageSetBuffer, binary.BigEndian, uint32(length)) // message length
+				binary.Write(messageSetBuffer, binary.BigEndian, uint32(cksum))  // CRC
+				messageSetBuffer.Write(chunk.Bytes())
+			}
+
+			binary.Write(buf, binary.BigEndian, uint32(len(messageSetBuffer.Bytes())+8)) // msgSet size
+			binary.Write(buf, binary.BigEndian, uint64(maxOffset))                       // offset
+			buf.Write(messageSetBuffer.Bytes())
+
+		}
+
+	}
+
+	/*
+		sample response:
+
+		0x00, 0x00, 0x00, 0x01, // number of topics
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of blocks for this topic
+		0x00, 0x00, 0x00, 0x00, // partition id
+		0x00, 0x00, // error
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // high water mark offset
+		// messageSet
+		0x00, 0x00, 0x00, 0x1C, // messageset size
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // offset
+		// message
+		0x00, 0x00, 0x00, 0x10, // length of message (?)
+		0x23, 0x96, 0x4a, 0xf7, // CRC32
+		0x00, // format
+		0x00, // attribute (compression)
+		0xFF, 0xFF, 0xFF, 0xFF, // key (nil)
+		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, // value
+	*/
+	return buf.Bytes()
+}

+ 36 - 0
fetch_request_expectation_test.go

@@ -0,0 +1,36 @@
+package sarama
+
+import (
+	"bytes"
+	"testing"
+)
+
+func TestFetchRequestSerialization(t *testing.T) {
+
+	exp := new(FetchRequestExpectation).
+		AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0xEE}), 3)
+
+	expected := []byte{
+		0x00, 0x00, 0x00, 0x01, // number of topics
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of blocks for this topic
+		0x00, 0x00, 0x00, 0x00, // partition id
+		0x00, 0x00, // error
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // high water mark offset
+		// messageSet
+		0x00, 0x00, 0x00, 0x1C, // messageset size
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, // offset
+		// message
+		0x00, 0x00, 0x00, 0x10, // length of message (?)
+		0x23, 0x96, 0x4a, 0xf7, // CRC32
+		0x00,                   // format
+		0x00,                   // attribute (compression)
+		0xFF, 0xFF, 0xFF, 0xFF, // key (nil)
+		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE, // value
+	}
+
+	actual := exp.ResponseBytes()
+	if bytes.Compare(actual, expected) != 0 {
+		t.Error("\nExpected\n", expected, "\nbut got\n", actual)
+	}
+}

+ 5 - 2
message.go

@@ -10,6 +10,9 @@ import (
 // CompressionCodec represents the various compression codecs recognized by Kafka in messages.
 type CompressionCodec int8
 
+// only the last two bits are really used
+const compressionCodecMask int8 = 0x03
+
 const (
 	CompressionNone   CompressionCodec = 0
 	CompressionGZIP   CompressionCodec = 1
@@ -33,7 +36,7 @@ func (m *Message) encode(pe packetEncoder) error {
 
 	pe.putInt8(messageFormat)
 
-	attributes := int8(m.Codec) & 0x07
+	attributes := int8(m.Codec) & compressionCodecMask
 	pe.putInt8(attributes)
 
 	err := pe.putBytes(m.Key)
@@ -95,7 +98,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 	if err != nil {
 		return err
 	}
-	m.Codec = CompressionCodec(attribute & 0x07)
+	m.Codec = CompressionCodec(attribute & compressionCodecMask)
 
 	m.Key, err = pd.getBytes()
 	if err != nil {

+ 1 - 1
mockbroker/metadata_request_expectation.go → metadata_request_expectation.go

@@ -1,4 +1,4 @@
-package mockbroker
+package sarama
 
 import (
 	"bytes"

+ 1 - 1
mockbroker/metadata_request_expectation_test.go → metadata_request_expectation_test.go

@@ -1,4 +1,4 @@
-package mockbroker
+package sarama
 
 import (
 	"bytes"

+ 15 - 2
mockbroker/mockbroker.go → mockbroker.go

@@ -1,4 +1,4 @@
-package mockbroker
+package sarama
 
 import (
 	"encoding/binary"
@@ -45,6 +45,16 @@ func (b *MockBroker) Addr() string {
 	return b.listener.Addr().String()
 }
 
+type rawExpectation []byte
+
+func (r rawExpectation) ResponseBytes() []byte {
+	return r
+}
+
+func (b *MockBroker) ExpectBytes(bytes []byte) {
+	b.expectations <- rawExpectation(bytes)
+}
+
 func (b *MockBroker) Close() {
 	if b.expecting {
 		b.t.Fatalf("Not all expectations were satisfied in mockBroker with ID=%d!", b.BrokerID())
@@ -80,6 +90,9 @@ func (b *MockBroker) serverLoop() (ok bool) {
 			return b.serverError(err, conn)
 		}
 		response := expectation.ResponseBytes()
+		if len(response) == 0 {
+			continue
+		}
 
 		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
 		binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
@@ -112,7 +125,7 @@ func (b *MockBroker) serverError(err error, conn net.Conn) bool {
 // New launches a fake Kafka broker. It takes a testing.T as provided by the
 // test framework and a channel of responses to use.  If an error occurs it is
 // simply logged to the testing.T and the broker exits.
-func New(t *testing.T, brokerID int) *MockBroker {
+func NewMockBroker(t *testing.T, brokerID int) *MockBroker {
 	var err error
 
 	broker := &MockBroker{

+ 65 - 0
offset_fetch_request_expectation.go

@@ -0,0 +1,65 @@
+package sarama
+
+import (
+	"bytes"
+	"encoding/binary"
+)
+
+type OffsetFetchRequestExpectation struct {
+	topicPartitions []offsetFetchRequestTP
+}
+
+type offsetFetchRequestTP struct {
+	topic     string
+	partition int32
+	offset    uint64
+}
+
+func (e *OffsetFetchRequestExpectation) AddTopicPartition(
+	topic string, partition int32, offset uint64,
+) *OffsetFetchRequestExpectation {
+	ofrtp := offsetFetchRequestTP{topic, partition, offset}
+	e.topicPartitions = append(e.topicPartitions, ofrtp)
+	return e
+}
+
+func (b *MockBroker) ExpectOffsetFetchRequest() *OffsetFetchRequestExpectation {
+	e := &OffsetFetchRequestExpectation{}
+	b.expectations <- e
+	return e
+}
+
+func (e *OffsetFetchRequestExpectation) ResponseBytes() []byte {
+	buf := new(bytes.Buffer)
+
+	byTopic := make(map[string][]offsetFetchRequestTP)
+	for _, ofrtp := range e.topicPartitions {
+		byTopic[ofrtp.topic] = append(byTopic[ofrtp.topic], ofrtp)
+	}
+
+	binary.Write(buf, binary.BigEndian, uint32(len(byTopic)))
+	for topic, tps := range byTopic {
+		binary.Write(buf, binary.BigEndian, uint16(len(topic)))
+		buf.Write([]byte(topic))
+		binary.Write(buf, binary.BigEndian, uint32(len(tps)))
+		for _, tp := range tps {
+			binary.Write(buf, binary.BigEndian, uint32(tp.partition))
+			binary.Write(buf, binary.BigEndian, uint16(0)) // error
+			binary.Write(buf, binary.BigEndian, uint32(1))
+			binary.Write(buf, binary.BigEndian, uint64(tp.offset)) // offset
+		}
+	}
+
+	/*
+		sample response:
+
+		0x00, 0x00, 0x00, 0x01, // number of topics
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of blocks for this partition
+		0x00, 0x00, 0x00, 0x00, // partition id
+		0x00, 0x00, // error
+		0x00, 0x00, 0x00, 0x01, // number of offsets
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, // offset
+	*/
+	return buf.Bytes()
+}

+ 27 - 0
offset_fetch_request_expectation_test.go

@@ -0,0 +1,27 @@
+package sarama
+
+import (
+	"bytes"
+	"testing"
+)
+
+func TestOffsetFetchRequestSerialization(t *testing.T) {
+
+	exp := new(OffsetFetchRequestExpectation).
+		AddTopicPartition("my_topic", 0, 0x010101)
+
+	expected := []byte{
+		0x00, 0x00, 0x00, 0x01, // number of topics
+		0x00, 0x08, 'm', 'y', '_', 't', 'o', 'p', 'i', 'c', // topic name
+		0x00, 0x00, 0x00, 0x01, // number of blocks for this partition
+		0x00, 0x00, 0x00, 0x00, // partition id
+		0x00, 0x00, // error
+		0x00, 0x00, 0x00, 0x01, // number of offsets
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x01, 0x01, // offset
+	}
+
+	actual := exp.ResponseBytes()
+	if bytes.Compare(actual, expected) != 0 {
+		t.Error("\nExpected\n", expected, "\nbut got\n", actual)
+	}
+}

+ 5 - 5
mockbroker/produce_request_expectation.go → produce_request_expectation.go

@@ -1,4 +1,4 @@
-package mockbroker
+package sarama
 
 import (
 	"bytes"
@@ -14,11 +14,11 @@ type produceRequestTP struct {
 	topic     string
 	partition int32
 	nMessages int
-	err       error
+	err       KError
 }
 
 func (e *ProduceRequestExpectation) AddTopicPartition(
-	topic string, partition int32, nMessages int, err error,
+	topic string, partition int32, nMessages int, err KError,
 ) *ProduceRequestExpectation {
 	prtp := produceRequestTP{topic, partition, nMessages, err}
 	e.topicPartitions = append(e.topicPartitions, prtp)
@@ -46,8 +46,8 @@ func (e *ProduceRequestExpectation) ResponseBytes() []byte {
 		binary.Write(buf, binary.BigEndian, uint32(len(tps)))
 		for _, tp := range tps {
 			binary.Write(buf, binary.BigEndian, uint32(tp.partition))
-			binary.Write(buf, binary.BigEndian, uint16(0)) // TODO: error
-			binary.Write(buf, binary.BigEndian, uint64(0)) // offset
+			binary.Write(buf, binary.BigEndian, uint16(tp.err)) // TODO: error
+			binary.Write(buf, binary.BigEndian, uint64(0))      // offset
 		}
 	}
 

+ 1 - 1
mockbroker/produce_request_expectation_test.go → produce_request_expectation_test.go

@@ -1,4 +1,4 @@
-package mockbroker
+package sarama
 
 import (
 	"bytes"

+ 10 - 10
producer.go

@@ -8,20 +8,20 @@ import (
 
 // ProducerConfig is used to pass multiple configuration options to NewProducer.
 //
-// If MaxBufferTime=MaxBufferBytes=0, messages will be delivered immediately and
+// If MaxBufferTime=MaxBufferedBytes=0, messages will be delivered immediately and
 // constantly, but if multiple messages are received while a roundtrip to kafka
 // is in progress, they will both be combined into the next request. In this
 // mode, errors are not returned from SendMessage, but over the Errors()
 // channel.
 //
-// With MaxBufferTime and/or MaxBufferBytes set to values > 0, sarama will
+// With MaxBufferTime and/or MaxBufferedBytes set to values > 0, sarama will
 // buffer messages before sending, to reduce traffic.
 type ProducerConfig struct {
 	Partitioner        Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
 	RequiredAcks       RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
 	Timeout            int32            // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
 	Compression        CompressionCodec // The type of compression to use on messages (defaults to no compression).
-	MaxBufferBytes     uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
+	MaxBufferedBytes   uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
 	MaxBufferTime      uint32           // The maximum number of milliseconds to buffer messages before sending to a broker.
 	MaxDeliveryRetries uint32           // The number of times to retry a failed message. You should always specify at least 1.
 }
@@ -33,7 +33,7 @@ type ProducerConfig struct {
 // scope (this is in addition to calling Close on the underlying client, which
 // is still necessary).
 //
-// If MaxBufferBytes=0 and MaxBufferTime=0, the Producer is considered to be
+// If MaxBufferedBytes=0 and MaxBufferTime=0, the Producer is considered to be
 // operating in "synchronous" mode. This means that errors will be returned
 // directly from calls to SendMessage. If either value is greater than zero, the
 // Producer is operating in "asynchronous" mode, and you must read these return
@@ -88,8 +88,8 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 		config.Partitioner = NewRandomPartitioner()
 	}
 
-	if config.MaxBufferBytes == 0 {
-		config.MaxBufferBytes = 1
+	if config.MaxBufferedBytes == 0 {
+		config.MaxBufferedBytes = 1
 	}
 
 	return &Producer{
@@ -125,7 +125,7 @@ func (p *Producer) Close() error {
 // strings as either key or value, see the StringEncoder type.
 //
 // QueueMessage uses buffering semantics to reduce the nubmer of requests to the
-// broker. The buffer logic is tunable with config.MaxBufferBytes and
+// broker. The buffer logic is tunable with config.MaxBufferedBytes and
 // config.MaxBufferTime.
 //
 // QueueMessage will return an error if it's unable to construct the message
@@ -189,7 +189,7 @@ func (p *Producer) addMessage(msg *produceMessage) error {
 	if err != nil {
 		return err
 	}
-	bp.addMessage(msg, p.config.MaxBufferBytes)
+	bp.addMessage(msg, p.config.MaxBufferedBytes)
 	return nil
 }
 
@@ -236,9 +236,7 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
 		for {
 			select {
 			case <-bp.flushNow:
-				println("{{")
 				bp.flush(p)
-				println("}}")
 			case <-timer.C:
 				bp.flushIfAnyMessages(p)
 			case <-bp.stopper:
@@ -318,6 +316,8 @@ func (bp *brokerProducer) flush(p *Producer) {
 		bp.bufferedBytes -= prb.byteSize()
 		bp.mapM.Unlock()
 
+		// TODO: Compression probably discards messages because they need to be wrapped in a MessageSet or something.
+
 		bp.flushRequest(p, prb, func(err error) {
 			p.errors <- err
 		})

+ 28 - 28
producer_test.go

@@ -2,7 +2,6 @@ package sarama
 
 import (
 	"fmt"
-	"github.com/shopify/sarama/mockbroker"
 	"testing"
 	"time"
 )
@@ -11,17 +10,17 @@ const TestMessage = "ABC THE MESSAGE"
 
 func TestSimpleProducer(t *testing.T) {
 
-	mb1 := mockbroker.New(t, 1)
-	mb2 := mockbroker.New(t, 2)
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
 	defer mb1.Close()
 	defer mb2.Close()
 
 	mb1.ExpectMetadataRequest().
 		AddBroker(mb2).
-		AddTopicPartition("my_topic", 1, 2)
+		AddTopicPartition("my_topic", 0, 2)
 
 	mb2.ExpectProduceRequest().
-		AddTopicPartition("my_topic", 1, 10, nil)
+		AddTopicPartition("my_topic", 0, 1, nil)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
@@ -32,7 +31,7 @@ func TestSimpleProducer(t *testing.T) {
 		RequiredAcks:  WaitForLocal,
 		MaxBufferTime: 1000000, // "never"
 		// So that we flush once, after the 10th message.
-		MaxBufferBytes: uint32((len(TestMessage) * 10) - 1),
+		MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
 	})
 	defer producer.Close()
 
@@ -45,8 +44,8 @@ func TestSimpleProducer(t *testing.T) {
 
 func TestSimpleSyncProducer(t *testing.T) {
 
-	mb1 := mockbroker.New(t, 1)
-	mb2 := mockbroker.New(t, 2)
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
 	defer mb1.Close()
 	defer mb2.Close()
 
@@ -68,7 +67,7 @@ func TestSimpleSyncProducer(t *testing.T) {
 		RequiredAcks:  WaitForLocal,
 		MaxBufferTime: 1000000, // "never"
 		// So that we flush once, after the 10th message.
-		MaxBufferBytes: uint32((len(TestMessage) * 10) - 1),
+		MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
 	})
 	defer producer.Close()
 
@@ -78,10 +77,9 @@ func TestSimpleSyncProducer(t *testing.T) {
 }
 
 func TestMultipleFlushes(t *testing.T) {
-	t.Fatal("pending")
 
-	mb1 := mockbroker.New(t, 1)
-	mb2 := mockbroker.New(t, 2)
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
 	defer mb1.Close()
 	defer mb2.Close()
 
@@ -93,6 +91,10 @@ func TestMultipleFlushes(t *testing.T) {
 		AddTopicPartition("my_topic", 0, 1, nil).
 		AddTopicPartition("my_topic", 0, 1, nil)
 
+	mb2.ExpectProduceRequest().
+		AddTopicPartition("my_topic", 0, 1, nil).
+		AddTopicPartition("my_topic", 0, 1, nil)
+
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
@@ -102,7 +104,7 @@ func TestMultipleFlushes(t *testing.T) {
 		RequiredAcks:  WaitForLocal,
 		MaxBufferTime: 1000000, // "never"
 		// So that we flush once, after the 5th message.
-		MaxBufferBytes: uint32((len(TestMessage) * 5) - 1),
+		MaxBufferedBytes: uint32((len(TestMessage) * 5) - 1),
 	})
 	defer producer.Close()
 
@@ -113,11 +115,10 @@ func TestMultipleFlushes(t *testing.T) {
 }
 
 func TestMultipleProducer(t *testing.T) {
-	t.Fatal("pending")
 
-	mb1 := mockbroker.New(t, 1)
-	mb2 := mockbroker.New(t, 2)
-	mb3 := mockbroker.New(t, 3)
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
+	mb3 := NewMockBroker(t, 3)
 	defer mb1.Close()
 	defer mb2.Close()
 	defer mb3.Close()
@@ -125,9 +126,9 @@ func TestMultipleProducer(t *testing.T) {
 	mb1.ExpectMetadataRequest().
 		AddBroker(mb2).
 		AddBroker(mb3).
-		AddTopicPartition("topic_a", 0, 1).
-		AddTopicPartition("topic_b", 0, 2).
-		AddTopicPartition("topic_c", 0, 2)
+		AddTopicPartition("topic_a", 0, 2).
+		AddTopicPartition("topic_b", 0, 3).
+		AddTopicPartition("topic_c", 0, 3)
 
 	mb2.ExpectProduceRequest().
 		AddTopicPartition("topic_a", 0, 1, nil)
@@ -145,7 +146,7 @@ func TestMultipleProducer(t *testing.T) {
 		RequiredAcks:  WaitForLocal,
 		MaxBufferTime: 1000000, // "never"
 		// So that we flush once, after the 10th message.
-		MaxBufferBytes: uint32((len(TestMessage) * 10) - 1),
+		MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
 	})
 	defer producer.Close()
 
@@ -173,10 +174,9 @@ func TestMultipleProducer(t *testing.T) {
 // happens correctly; that is, the first messages are retried before the next
 // batch is allowed to submit.
 func TestFailureRetry(t *testing.T) {
-	t.Fatal("pending")
 
-	mb1 := mockbroker.New(t, 1)
-	mb2 := mockbroker.New(t, 2)
+	mb1 := NewMockBroker(t, 1)
+	mb2 := NewMockBroker(t, 2)
 	defer mb1.Close()
 	defer mb2.Close()
 
@@ -188,15 +188,15 @@ func TestFailureRetry(t *testing.T) {
 
 	mb2.ExpectProduceRequest().
 		AddTopicPartition("topic_b", 0, 1, NotLeaderForPartition).
-		AddTopicPartition("topic_c", 0, 1, nil)
+		AddTopicPartition("topic_c", 0, 1, NoError)
 
 	mb1.ExpectMetadataRequest().
 		AddBroker(mb2).
 		AddTopicPartition("topic_b", 0, 1)
 
 	mb1.ExpectProduceRequest().
-		AddTopicPartition("topic_a", 0, 1, nil).
-		AddTopicPartition("topic_b", 0, 1, nil)
+		AddTopicPartition("topic_a", 0, 1, NoError).
+		AddTopicPartition("topic_b", 0, 1, NoError)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
@@ -208,7 +208,7 @@ func TestFailureRetry(t *testing.T) {
 		RequiredAcks:  WaitForLocal,
 		MaxBufferTime: 1000000, // "never"
 		// So that we flush after the 2nd message.
-		MaxBufferBytes:     uint32((len(TestMessage) * 2) - 1),
+		MaxBufferedBytes:   uint32((len(TestMessage) * 2) - 1),
 		MaxDeliveryRetries: 1,
 	})
 	if err != nil {