Sfoglia il codice sorgente

wip remove namespacing

Evan Huus 12 anni fa
parent
commit
be4d7628e5

+ 11 - 25
protocol/broker.go → broker.go

@@ -1,19 +1,5 @@
-/*
-Package protocol provides the low-level primitives necessary for communicating with a Kafka 0.8 cluster.
+package kafka
 
-The core of the package is the Broker. It represents a connection to a single Kafka broker service, and
-has methods for querying the broker.
-
-The other types are mostly Request types or Response types. Most of the Broker methods take a Request of a
-specific type and return a Response of the appropriate type.
-
-The objects and properties in this package are mostly undocumented, as they line up exactly with the
-protocol fields documented by Kafka at https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
-*/
-package protocol
-
-import enc "sarama/encoding"
-import "sarama/types"
 import (
 	"io"
 	"net"
@@ -143,7 +129,7 @@ func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResp
 	var response *ProduceResponse
 	var err error
 
-	if request.RequiredAcks == types.NO_RESPONSE {
+	if request.RequiredAcks == NO_RESPONSE {
 		err = b.sendAndReceive(clientID, request, nil)
 	} else {
 		response = new(ProduceResponse)
@@ -202,7 +188,7 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
 	}
 
 	fullRequest := request{b.correlation_id, clientID, req}
-	buf, err := enc.Encode(&fullRequest)
+	buf, err := encode(&fullRequest)
 	if err != nil {
 		return nil, err
 	}
@@ -223,7 +209,7 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
 	return &promise, nil
 }
 
-func (b *Broker) sendAndReceive(clientID string, req requestEncoder, res enc.Decoder) error {
+func (b *Broker) sendAndReceive(clientID string, req requestEncoder, res decoder) error {
 	promise, err := b.send(clientID, req, res != nil)
 
 	if err != nil {
@@ -236,24 +222,24 @@ func (b *Broker) sendAndReceive(clientID string, req requestEncoder, res enc.Dec
 
 	select {
 	case buf := <-promise.packets:
-		return enc.Decode(buf, res)
+		return decode(buf, res)
 	case err = <-promise.errors:
 		return err
 	}
 }
 
-func (b *Broker) Decode(pd enc.PacketDecoder) (err error) {
-	b.id, err = pd.GetInt32()
+func (b *Broker) decode(pd packetDecoder) (err error) {
+	b.id, err = pd.getInt32()
 	if err != nil {
 		return err
 	}
 
-	b.host, err = pd.GetString()
+	b.host, err = pd.getString()
 	if err != nil {
 		return err
 	}
 
-	b.port, err = pd.GetInt32()
+	b.port, err = pd.getInt32()
 	if err != nil {
 		return err
 	}
@@ -271,13 +257,13 @@ func (b *Broker) responseReceiver() {
 		}
 
 		decodedHeader := responseHeader{}
-		err = enc.Decode(header, &decodedHeader)
+		err = decode(header, &decodedHeader)
 		if err != nil {
 			response.errors <- err
 			continue
 		}
 		if decodedHeader.correlation_id != response.correlation_id {
-			response.errors <- enc.DecodingError
+			response.errors <- DecodingError
 			continue
 		}
 

+ 18 - 14
protocol/broker_test.go → broker_test.go

@@ -1,12 +1,16 @@
-package protocol
+package kafka
 
 import (
+	"encoding/binary"
 	"fmt"
-	"sarama/mock"
-	"sarama/types"
+	"io"
+	"net"
+	"strconv"
 	"testing"
+	"time"
 )
-// Broker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
+
+// MockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
 // accepts a single connection. It reads Kafka requests from that connection and returns each response
 // from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
 // the server sleeps for 250ms instead of reading a request).
@@ -16,7 +20,7 @@ import (
 //
 // It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
 // automatically as a convenience.
-type Broker struct {
+type MockBroker struct {
 	port      int32
 	stopper   chan bool
 	responses chan []byte
@@ -25,18 +29,18 @@ type Broker struct {
 }
 
 // Port is the kernel-select TCP port the broker is listening on.
-func (b *Broker) Port() int32 {
+func (b *MockBroker) Port() int32 {
 	return b.port
 }
 
 // Close closes the response channel originally provided, then waits to make sure
 // that all requests/responses matched up before exiting.
-func (b *Broker) Close() {
+func (b *MockBroker) Close() {
 	close(b.responses)
 	<-b.stopper
 }
 
-func (b *Broker) serverLoop() {
+func (b *MockBroker) serverLoop() {
 	defer close(b.stopper)
 	conn, err := b.listener.Accept()
 	if err != nil {
@@ -106,12 +110,12 @@ func (b *Broker) serverLoop() {
 	}
 }
 
-// NewBroker launches a fake Kafka broker. It takes a testing.T as provided by the test framework and a channel of responses to use.
+// NewMockBroker launches a fake Kafka broker. It takes a testing.T as provided by the test framework and a channel of responses to use.
 // If an error occurs it is simply logged to the testing.T and the broker exits.
-func NewBroker(t *testing.T, responses chan []byte) *Broker {
+func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {
 	var err error
 
-	broker := new(Broker)
+	broker := new(MockBroker)
 	broker.stopper = make(chan bool)
 	broker.responses = responses
 	broker.t = t
@@ -209,7 +213,7 @@ func TestBrokerID(t *testing.T) {
 
 func TestSimpleBrokerCommunication(t *testing.T) {
 	responses := make(chan []byte)
-	mockBroker := mock.NewBroker(t, responses)
+	mockBroker := NewMockBroker(t, responses)
 	defer mockBroker.Close()
 
 	broker := NewBroker("localhost", mockBroker.Port())
@@ -253,7 +257,7 @@ var brokerTestTable = []struct {
 	{[]byte{},
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
-			request.RequiredAcks = types.NO_RESPONSE
+			request.RequiredAcks = NO_RESPONSE
 			response, err := broker.Produce("clientID", &request)
 			if err != nil {
 				t.Error(err)
@@ -266,7 +270,7 @@ var brokerTestTable = []struct {
 	{[]byte{0x00, 0x00, 0x00, 0x00},
 		func(t *testing.T, broker *Broker) {
 			request := ProduceRequest{}
-			request.RequiredAcks = types.WAIT_FOR_LOCAL
+			request.RequiredAcks = WAIT_FOR_LOCAL
 			response, err := broker.Produce("clientID", &request)
 			if err != nil {
 				t.Error(err)

+ 6 - 6
crc32_field.go

@@ -5,26 +5,26 @@ import (
 	"hash/crc32"
 )
 
-// crc32field implements the pushEncoder and pushDecoder interfaces for calculating CRC32s.
-type crc32field struct {
+// crc32Field implements the pushEncoder and pushDecoder interfaces for calculating CRC32s.
+type crc32Field struct {
 	startOffset int
 }
 
-func (c *crc32field) saveOffset(in int) {
+func (c *crc32Field) saveOffset(in int) {
 	c.startOffset = in
 }
 
-func (c *crc32field) reserveLength() int {
+func (c *crc32Field) reserveLength() int {
 	return 4
 }
 
-func (c *crc32field) run(curOffset int, buf []byte) error {
+func (c *crc32Field) run(curOffset int, buf []byte) error {
 	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
 	binary.BigEndian.PutUint32(buf[c.startOffset:], crc)
 	return nil
 }
 
-func (c *crc32field) check(curOffset int, buf []byte) error {
+func (c *crc32Field) check(curOffset int, buf []byte) error {
 	crc := crc32.ChecksumIEEE(buf[c.startOffset+4 : curOffset])
 
 	if crc != binary.BigEndian.Uint32(buf[c.startOffset:]) {

+ 6 - 1
errors.go

@@ -2,6 +2,12 @@ package kafka
 
 import "errors"
 
+// AlreadyConnected is the error returned when calling Connect() on a Broker that is already connected.
+var AlreadyConnected = errors.New("kafka: broker: already connected")
+
+// NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
+var NotConnected = errors.New("kafka: broker: not connected")
+
 // EncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
 // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
 var EncodingError = errors.New("kafka: Error while encoding packet.")
@@ -72,4 +78,3 @@ func (err KError) Error() string {
 		return "Unknown error, how did this happen?"
 	}
 }
-

+ 13 - 15
protocol/fetch_request.go → fetch_request.go

@@ -1,15 +1,13 @@
-package protocol
-
-import enc "sarama/encoding"
+package kafka
 
 type fetchRequestBlock struct {
 	fetchOffset int64
 	maxBytes    int32
 }
 
-func (f *fetchRequestBlock) Encode(pe enc.PacketEncoder) error {
-	pe.PutInt64(f.fetchOffset)
-	pe.PutInt32(f.maxBytes)
+func (f *fetchRequestBlock) encode(pe packetEncoder) error {
+	pe.putInt64(f.fetchOffset)
+	pe.putInt32(f.maxBytes)
 	return nil
 }
 
@@ -19,26 +17,26 @@ type FetchRequest struct {
 	blocks      map[string]map[int32]*fetchRequestBlock
 }
 
-func (f *FetchRequest) Encode(pe enc.PacketEncoder) (err error) {
-	pe.PutInt32(-1) // replica ID is always -1 for clients
-	pe.PutInt32(f.MaxWaitTime)
-	pe.PutInt32(f.MinBytes)
-	err = pe.PutArrayLength(len(f.blocks))
+func (f *FetchRequest) encode(pe packetEncoder) (err error) {
+	pe.putInt32(-1) // replica ID is always -1 for clients
+	pe.putInt32(f.MaxWaitTime)
+	pe.putInt32(f.MinBytes)
+	err = pe.putArrayLength(len(f.blocks))
 	if err != nil {
 		return err
 	}
 	for topic, blocks := range f.blocks {
-		err = pe.PutString(topic)
+		err = pe.putString(topic)
 		if err != nil {
 			return err
 		}
-		err = pe.PutArrayLength(len(blocks))
+		err = pe.putArrayLength(len(blocks))
 		if err != nil {
 			return err
 		}
 		for partition, block := range blocks {
-			pe.PutInt32(partition)
-			err = block.Encode(pe)
+			pe.putInt32(partition)
+			err = block.encode(pe)
 			if err != nil {
 				return err
 			}

+ 1 - 1
protocol/fetch_request_test.go → fetch_request_test.go

@@ -1,4 +1,4 @@
-package protocol
+package kafka
 
 import "testing"
 

+ 15 - 18
protocol/fetch_response.go → fetch_response.go

@@ -1,36 +1,33 @@
-package protocol
-
-import enc "sarama/encoding"
-import "sarama/types"
+package kafka
 
 type FetchResponseBlock struct {
-	Err                 types.KError
+	Err                 KError
 	HighWaterMarkOffset int64
 	MsgSet              MessageSet
 }
 
-func (pr *FetchResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
-	tmp, err := pd.GetInt16()
+func (pr *FetchResponseBlock) decode(pd packetDecoder) (err error) {
+	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
 	}
-	pr.Err = types.KError(tmp)
+	pr.Err = KError(tmp)
 
-	pr.HighWaterMarkOffset, err = pd.GetInt64()
+	pr.HighWaterMarkOffset, err = pd.getInt64()
 	if err != nil {
 		return err
 	}
 
-	msgSetSize, err := pd.GetInt32()
+	msgSetSize, err := pd.getInt32()
 	if err != nil {
 		return err
 	}
 
-	msgSetDecoder, err := pd.GetSubset(int(msgSetSize))
+	msgSetDecoder, err := pd.getSubset(int(msgSetSize))
 	if err != nil {
 		return err
 	}
-	err = (&pr.MsgSet).Decode(msgSetDecoder)
+	err = (&pr.MsgSet).decode(msgSetDecoder)
 
 	return err
 }
@@ -39,20 +36,20 @@ type FetchResponse struct {
 	Blocks map[string]map[int32]*FetchResponseBlock
 }
 
-func (fr *FetchResponse) Decode(pd enc.PacketDecoder) (err error) {
-	numTopics, err := pd.GetArrayLength()
+func (fr *FetchResponse) decode(pd packetDecoder) (err error) {
+	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err
 	}
 
 	fr.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
 	for i := 0; i < numTopics; i++ {
-		name, err := pd.GetString()
+		name, err := pd.getString()
 		if err != nil {
 			return err
 		}
 
-		numBlocks, err := pd.GetArrayLength()
+		numBlocks, err := pd.getArrayLength()
 		if err != nil {
 			return err
 		}
@@ -60,13 +57,13 @@ func (fr *FetchResponse) Decode(pd enc.PacketDecoder) (err error) {
 		fr.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
 
 		for j := 0; j < numBlocks; j++ {
-			id, err := pd.GetInt32()
+			id, err := pd.getInt32()
 			if err != nil {
 				return err
 			}
 
 			block := new(FetchResponseBlock)
-			err = block.Decode(pd)
+			err = block.decode(pd)
 			if err != nil {
 				return err
 			}

+ 3 - 4
protocol/fetch_response_test.go → fetch_response_test.go

@@ -1,10 +1,9 @@
-package protocol
+package kafka
 
 import (
 	"bytes"
 	"testing"
 )
-import kafka "sarama/types"
 
 var (
 	emptyFetchResponse = []byte{
@@ -47,7 +46,7 @@ func TestOneMessageFetchResponse(t *testing.T) {
 		if len(response.Blocks["topic"]) == 1 {
 			block := response.GetBlock("topic", 5)
 			if block != nil {
-				if block.Err != kafka.OFFSET_OUT_OF_RANGE {
+				if block.Err != OFFSET_OUT_OF_RANGE {
 					t.Error("Decoding didn't produce correct error code.")
 				}
 				if block.HighWaterMarkOffset != 0x10101010 {
@@ -62,7 +61,7 @@ func TestOneMessageFetchResponse(t *testing.T) {
 						t.Error("Decoding produced incorrect message offset.")
 					}
 					msg := msgBlock.Msg
-					if msg.Codec != kafka.COMPRESSION_NONE {
+					if msg.Codec != COMPRESSION_NONE {
 						t.Error("Decoding produced incorrect message compression.")
 					}
 					if msg.Key != nil {

+ 28 - 31
protocol/message.go → message.go

@@ -1,11 +1,9 @@
-package protocol
+package kafka
 
-import enc "sarama/encoding"
 import (
 	"bytes"
 	"compress/gzip"
 	"io/ioutil"
-	"sarama/types"
 )
 
 // CompressionCodec represents the various compression codecs recognized by Kafka in messages.
@@ -17,36 +15,35 @@ const (
 	COMPRESSION_SNAPPY CompressionCodec = 2
 )
 
-
 // The spec just says: "This is a version id used to allow backwards compatible evolution of the message
 // binary format." but it doesn't say what the current value is, so presumably 0...
 const message_format int8 = 0
 
 type Message struct {
-	Codec types.CompressionCodec // codec used to compress the message contents
-	Key   []byte                 // the message key, may be nil
-	Value []byte                 // the message contents
+	Codec CompressionCodec // codec used to compress the message contents
+	Key   []byte           // the message key, may be nil
+	Value []byte           // the message contents
 }
 
-func (m *Message) Encode(pe enc.PacketEncoder) error {
-	pe.Push(&enc.CRC32Field{})
+func (m *Message) encode(pe packetEncoder) error {
+	pe.push(&crc32Field{})
 
-	pe.PutInt8(message_format)
+	pe.putInt8(message_format)
 
 	var attributes int8 = 0
 	attributes |= int8(m.Codec) & 0x07
-	pe.PutInt8(attributes)
+	pe.putInt8(attributes)
 
-	err := pe.PutBytes(m.Key)
+	err := pe.putBytes(m.Key)
 	if err != nil {
 		return err
 	}
 
 	var body []byte
 	switch m.Codec {
-	case types.COMPRESSION_NONE:
+	case COMPRESSION_NONE:
 		body = m.Value
-	case types.COMPRESSION_GZIP:
+	case COMPRESSION_GZIP:
 		if m.Value != nil {
 			var buf bytes.Buffer
 			writer := gzip.NewWriter(&buf)
@@ -54,53 +51,53 @@ func (m *Message) Encode(pe enc.PacketEncoder) error {
 			writer.Close()
 			body = buf.Bytes()
 		}
-	case types.COMPRESSION_SNAPPY:
+	case COMPRESSION_SNAPPY:
 		// TODO
 	}
-	err = pe.PutBytes(body)
+	err = pe.putBytes(body)
 	if err != nil {
 		return err
 	}
 
-	return pe.Pop()
+	return pe.pop()
 }
 
-func (m *Message) Decode(pd enc.PacketDecoder) (err error) {
-	err = pd.Push(&enc.CRC32Field{})
+func (m *Message) decode(pd packetDecoder) (err error) {
+	err = pd.push(&crc32Field{})
 	if err != nil {
 		return err
 	}
 
-	format, err := pd.GetInt8()
+	format, err := pd.getInt8()
 	if err != nil {
 		return err
 	}
 	if format != message_format {
-		return enc.DecodingError
+		return DecodingError
 	}
 
-	attribute, err := pd.GetInt8()
+	attribute, err := pd.getInt8()
 	if err != nil {
 		return err
 	}
-	m.Codec = types.CompressionCodec(attribute & 0x07)
+	m.Codec = CompressionCodec(attribute & 0x07)
 
-	m.Key, err = pd.GetBytes()
+	m.Key, err = pd.getBytes()
 	if err != nil {
 		return err
 	}
 
-	m.Value, err = pd.GetBytes()
+	m.Value, err = pd.getBytes()
 	if err != nil {
 		return err
 	}
 
 	switch m.Codec {
-	case types.COMPRESSION_NONE:
+	case COMPRESSION_NONE:
 		// nothing to do
-	case types.COMPRESSION_GZIP:
+	case COMPRESSION_GZIP:
 		if m.Value == nil {
-			return enc.DecodingError
+			return DecodingError
 		}
 		reader, err := gzip.NewReader(bytes.NewReader(m.Value))
 		if err != nil {
@@ -110,13 +107,13 @@ func (m *Message) Decode(pd enc.PacketDecoder) (err error) {
 		if err != nil {
 			return err
 		}
-	case types.COMPRESSION_SNAPPY:
+	case COMPRESSION_SNAPPY:
 		// TODO
 	default:
-		return enc.DecodingError
+		return DecodingError
 	}
 
-	err = pd.Pop()
+	err = pd.pop()
 	if err != nil {
 		return err
 	}

+ 17 - 19
protocol/message_set.go → message_set.go

@@ -1,40 +1,38 @@
-package protocol
-
-import enc "sarama/encoding"
+package kafka
 
 type MessageBlock struct {
 	Offset int64
 	Msg    *Message
 }
 
-func (msb *MessageBlock) Encode(pe enc.PacketEncoder) error {
-	pe.PutInt64(msb.Offset)
-	pe.Push(&enc.LengthField{})
-	err := msb.Msg.Encode(pe)
+func (msb *MessageBlock) encode(pe packetEncoder) error {
+	pe.putInt64(msb.Offset)
+	pe.push(&lengthField{})
+	err := msb.Msg.encode(pe)
 	if err != nil {
 		return err
 	}
-	return pe.Pop()
+	return pe.pop()
 }
 
-func (msb *MessageBlock) Decode(pd enc.PacketDecoder) (err error) {
-	msb.Offset, err = pd.GetInt64()
+func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
+	msb.Offset, err = pd.getInt64()
 	if err != nil {
 		return err
 	}
 
-	pd.Push(&enc.LengthField{})
+	pd.push(&lengthField{})
 	if err != nil {
 		return err
 	}
 
 	msb.Msg = new(Message)
-	err = msb.Msg.Decode(pd)
+	err = msb.Msg.decode(pd)
 	if err != nil {
 		return err
 	}
 
-	err = pd.Pop()
+	err = pd.pop()
 	if err != nil {
 		return err
 	}
@@ -47,9 +45,9 @@ type MessageSet struct {
 	Messages               []*MessageBlock
 }
 
-func (ms *MessageSet) Encode(pe enc.PacketEncoder) error {
+func (ms *MessageSet) encode(pe packetEncoder) error {
 	for i := range ms.Messages {
-		err := ms.Messages[i].Encode(pe)
+		err := ms.Messages[i].encode(pe)
 		if err != nil {
 			return err
 		}
@@ -57,16 +55,16 @@ func (ms *MessageSet) Encode(pe enc.PacketEncoder) error {
 	return nil
 }
 
-func (ms *MessageSet) Decode(pd enc.PacketDecoder) (err error) {
+func (ms *MessageSet) decode(pd packetDecoder) (err error) {
 	ms.Messages = nil
 
-	for pd.Remaining() > 0 {
+	for pd.remaining() > 0 {
 		msb := new(MessageBlock)
-		err = msb.Decode(pd)
+		err = msb.decode(pd)
 		switch err {
 		case nil:
 			ms.Messages = append(ms.Messages, msb)
-		case enc.InsufficientData:
+		case InsufficientData:
 			// As an optimization the server is allowed to return a partial message at the
 			// end of the message set. Clients should handle this case. So we just ignore such things.
 			ms.PartialTrailingMessage = true

+ 4 - 6
protocol/metadata_request.go → metadata_request.go

@@ -1,19 +1,17 @@
-package protocol
-
-import enc "sarama/encoding"
+package kafka
 
 type MetadataRequest struct {
 	Topics []string
 }
 
-func (mr *MetadataRequest) Encode(pe enc.PacketEncoder) error {
-	err := pe.PutArrayLength(len(mr.Topics))
+func (mr *MetadataRequest) encode(pe packetEncoder) error {
+	err := pe.putArrayLength(len(mr.Topics))
 	if err != nil {
 		return err
 	}
 
 	for i := range mr.Topics {
-		err = pe.PutString(mr.Topics[i])
+		err = pe.putString(mr.Topics[i])
 		if err != nil {
 			return err
 		}

+ 1 - 1
protocol/metadata_request_test.go → metadata_request_test.go

@@ -1,4 +1,4 @@
-package protocol
+package kafka
 
 import "testing"
 

+ 21 - 24
protocol/metadata_response.go → metadata_response.go

@@ -1,39 +1,36 @@
-package protocol
-
-import enc "sarama/encoding"
-import "sarama/types"
+package kafka
 
 type PartitionMetadata struct {
-	Err      types.KError
+	Err      KError
 	Id       int32
 	Leader   int32
 	Replicas []int32
 	Isr      []int32
 }
 
-func (pm *PartitionMetadata) Decode(pd enc.PacketDecoder) (err error) {
-	tmp, err := pd.GetInt16()
+func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
+	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
 	}
-	pm.Err = types.KError(tmp)
+	pm.Err = KError(tmp)
 
-	pm.Id, err = pd.GetInt32()
+	pm.Id, err = pd.getInt32()
 	if err != nil {
 		return err
 	}
 
-	pm.Leader, err = pd.GetInt32()
+	pm.Leader, err = pd.getInt32()
 	if err != nil {
 		return err
 	}
 
-	pm.Replicas, err = pd.GetInt32Array()
+	pm.Replicas, err = pd.getInt32Array()
 	if err != nil {
 		return err
 	}
 
-	pm.Isr, err = pd.GetInt32Array()
+	pm.Isr, err = pd.getInt32Array()
 	if err != nil {
 		return err
 	}
@@ -42,31 +39,31 @@ func (pm *PartitionMetadata) Decode(pd enc.PacketDecoder) (err error) {
 }
 
 type TopicMetadata struct {
-	Err        types.KError
+	Err        KError
 	Name       string
 	Partitions []*PartitionMetadata
 }
 
-func (tm *TopicMetadata) Decode(pd enc.PacketDecoder) (err error) {
-	tmp, err := pd.GetInt16()
+func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
+	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
 	}
-	tm.Err = types.KError(tmp)
+	tm.Err = KError(tmp)
 
-	tm.Name, err = pd.GetString()
+	tm.Name, err = pd.getString()
 	if err != nil {
 		return err
 	}
 
-	n, err := pd.GetArrayLength()
+	n, err := pd.getArrayLength()
 	if err != nil {
 		return err
 	}
 	tm.Partitions = make([]*PartitionMetadata, n)
 	for i := 0; i < n; i++ {
 		tm.Partitions[i] = new(PartitionMetadata)
-		err = tm.Partitions[i].Decode(pd)
+		err = tm.Partitions[i].decode(pd)
 		if err != nil {
 			return err
 		}
@@ -80,8 +77,8 @@ type MetadataResponse struct {
 	Topics  []*TopicMetadata
 }
 
-func (m *MetadataResponse) Decode(pd enc.PacketDecoder) (err error) {
-	n, err := pd.GetArrayLength()
+func (m *MetadataResponse) decode(pd packetDecoder) (err error) {
+	n, err := pd.getArrayLength()
 	if err != nil {
 		return err
 	}
@@ -89,13 +86,13 @@ func (m *MetadataResponse) Decode(pd enc.PacketDecoder) (err error) {
 	m.Brokers = make([]*Broker, n)
 	for i := 0; i < n; i++ {
 		m.Brokers[i] = new(Broker)
-		err = m.Brokers[i].Decode(pd)
+		err = m.Brokers[i].decode(pd)
 		if err != nil {
 			return err
 		}
 	}
 
-	n, err = pd.GetArrayLength()
+	n, err = pd.getArrayLength()
 	if err != nil {
 		return err
 	}
@@ -103,7 +100,7 @@ func (m *MetadataResponse) Decode(pd enc.PacketDecoder) (err error) {
 	m.Topics = make([]*TopicMetadata, n)
 	for i := 0; i < n; i++ {
 		m.Topics[i] = new(TopicMetadata)
-		err = m.Topics[i].Decode(pd)
+		err = m.Topics[i].decode(pd)
 		if err != nil {
 			return err
 		}

+ 4 - 5
protocol/metadata_response_test.go → metadata_response_test.go

@@ -1,7 +1,6 @@
-package protocol
+package kafka
 
 import "testing"
-import "sarama/types"
 
 var (
 	emptyMetadataResponse = []byte{
@@ -90,14 +89,14 @@ func TestMetadataResponseWithTopics(t *testing.T) {
 		t.Error("Decoding produced", len(response.Brokers), "brokers where there were none!")
 	}
 	if len(response.Topics) == 2 {
-		if response.Topics[0].Err != types.NO_ERROR {
+		if response.Topics[0].Err != NO_ERROR {
 			t.Error("Decoding produced invalid topic 0 error.")
 		}
 		if response.Topics[0].Name != "foo" {
 			t.Error("Decoding produced invalid topic 0 name.")
 		}
 		if len(response.Topics[0].Partitions) == 1 {
-			if response.Topics[0].Partitions[0].Err != types.INVALID_MESSAGE_SIZE {
+			if response.Topics[0].Partitions[0].Err != INVALID_MESSAGE_SIZE {
 				t.Error("Decoding produced invalid topic 0 partition 0 error.")
 			}
 			if response.Topics[0].Partitions[0].Id != 0x01 {
@@ -121,7 +120,7 @@ func TestMetadataResponseWithTopics(t *testing.T) {
 		} else {
 			t.Error("Decoding produced invalid partition count for topic 0.")
 		}
-		if response.Topics[1].Err != types.NO_ERROR {
+		if response.Topics[1].Err != NO_ERROR {
 			t.Error("Decoding produced invalid topic 1 error.")
 		}
 		if response.Topics[1].Name != "bar" {

+ 11 - 13
protocol/offset_commit_request.go → offset_commit_request.go

@@ -1,15 +1,13 @@
-package protocol
-
-import enc "sarama/encoding"
+package kafka
 
 type offsetCommitRequestBlock struct {
 	offset   int64
 	metadata string
 }
 
-func (r *offsetCommitRequestBlock) Encode(pe enc.PacketEncoder) error {
-	pe.PutInt64(r.offset)
-	return pe.PutString(r.metadata)
+func (r *offsetCommitRequestBlock) encode(pe packetEncoder) error {
+	pe.putInt64(r.offset)
+	return pe.putString(r.metadata)
 }
 
 type OffsetCommitRequest struct {
@@ -17,27 +15,27 @@ type OffsetCommitRequest struct {
 	blocks        map[string]map[int32]*offsetCommitRequestBlock
 }
 
-func (r *OffsetCommitRequest) Encode(pe enc.PacketEncoder) error {
-	err := pe.PutString(r.ConsumerGroup)
+func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
+	err := pe.putString(r.ConsumerGroup)
 	if err != nil {
 		return err
 	}
-	err = pe.PutArrayLength(len(r.blocks))
+	err = pe.putArrayLength(len(r.blocks))
 	if err != nil {
 		return err
 	}
 	for topic, partitions := range r.blocks {
-		err = pe.PutString(topic)
+		err = pe.putString(topic)
 		if err != nil {
 			return err
 		}
-		err = pe.PutArrayLength(len(partitions))
+		err = pe.putArrayLength(len(partitions))
 		if err != nil {
 			return err
 		}
 		for partition, block := range partitions {
-			pe.PutInt32(partition)
-			err = block.Encode(pe)
+			pe.putInt32(partition)
+			err = block.encode(pe)
 			if err != nil {
 				return err
 			}

+ 1 - 1
protocol/offset_commit_request_test.go → offset_commit_request_test.go

@@ -1,4 +1,4 @@
-package protocol
+package kafka
 
 import "testing"
 

+ 48 - 0
offset_commit_response.go

@@ -0,0 +1,48 @@
+package kafka
+
+type OffsetCommitResponse struct {
+	ClientID string
+	Errors   map[string]map[int32]KError
+}
+
+func (r *OffsetCommitResponse) decode(pd packetDecoder) (err error) {
+	r.ClientID, err = pd.getString()
+	if err != nil {
+		return err
+	}
+
+	numTopics, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	r.Errors = make(map[string]map[int32]KError, numTopics)
+	for i := 0; i < numTopics; i++ {
+		name, err := pd.getString()
+		if err != nil {
+			return err
+		}
+
+		numErrors, err := pd.getArrayLength()
+		if err != nil {
+			return err
+		}
+
+		r.Errors[name] = make(map[int32]KError, numErrors)
+
+		for j := 0; j < numErrors; j++ {
+			id, err := pd.getInt32()
+			if err != nil {
+				return err
+			}
+
+			tmp, err := pd.getInt16()
+			if err != nil {
+				return err
+			}
+			r.Errors[name][id] = KError(tmp)
+		}
+	}
+
+	return nil
+}

+ 2 - 3
protocol/offset_commit_response_test.go → offset_commit_response_test.go

@@ -1,7 +1,6 @@
-package protocol
+package kafka
 
 import "testing"
-import "sarama/types"
 
 var (
 	emptyOffsetCommitResponse = []byte{
@@ -45,7 +44,7 @@ func TestNormalOffsetCommitResponse(t *testing.T) {
 			t.Error("Decoding produced errors for topic 'm' where there were none.")
 		}
 		if len(response.Errors["t"]) == 1 {
-			if response.Errors["t"][0] != types.NOT_LEADER_FOR_PARTITION {
+			if response.Errors["t"][0] != NOT_LEADER_FOR_PARTITION {
 				t.Error("Decoding produced wrong error for topic 't' partition 0.")
 			}
 		} else {

+ 6 - 8
protocol/offset_fetch_request.go → offset_fetch_request.go

@@ -1,27 +1,25 @@
-package protocol
-
-import enc "sarama/encoding"
+package kafka
 
 type OffsetFetchRequest struct {
 	ConsumerGroup string
 	partitions    map[string][]int32
 }
 
-func (r *OffsetFetchRequest) Encode(pe enc.PacketEncoder) error {
-	err := pe.PutString(r.ConsumerGroup)
+func (r *OffsetFetchRequest) encode(pe packetEncoder) error {
+	err := pe.putString(r.ConsumerGroup)
 	if err != nil {
 		return err
 	}
-	err = pe.PutArrayLength(len(r.partitions))
+	err = pe.putArrayLength(len(r.partitions))
 	if err != nil {
 		return err
 	}
 	for topic, partitions := range r.partitions {
-		err = pe.PutString(topic)
+		err = pe.putString(topic)
 		if err != nil {
 			return err
 		}
-		pe.PutInt32Array(partitions)
+		pe.putInt32Array(partitions)
 	}
 	return nil
 }

+ 1 - 1
protocol/offset_fetch_request_test.go → offset_fetch_request_test.go

@@ -1,4 +1,4 @@
-package protocol
+package kafka
 
 import "testing"
 

+ 14 - 17
protocol/offset_fetch_response.go → offset_fetch_response.go

@@ -1,30 +1,27 @@
-package protocol
-
-import enc "sarama/encoding"
-import "sarama/types"
+package kafka
 
 type OffsetFetchResponseBlock struct {
 	Offset   int64
 	Metadata string
-	Err      types.KError
+	Err      KError
 }
 
-func (r *OffsetFetchResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
-	r.Offset, err = pd.GetInt64()
+func (r *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
+	r.Offset, err = pd.getInt64()
 	if err != nil {
 		return err
 	}
 
-	r.Metadata, err = pd.GetString()
+	r.Metadata, err = pd.getString()
 	if err != nil {
 		return err
 	}
 
-	tmp, err := pd.GetInt16()
+	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
 	}
-	r.Err = types.KError(tmp)
+	r.Err = KError(tmp)
 
 	return nil
 }
@@ -34,25 +31,25 @@ type OffsetFetchResponse struct {
 	Blocks   map[string]map[int32]*OffsetFetchResponseBlock
 }
 
-func (r *OffsetFetchResponse) Decode(pd enc.PacketDecoder) (err error) {
-	r.ClientID, err = pd.GetString()
+func (r *OffsetFetchResponse) decode(pd packetDecoder) (err error) {
+	r.ClientID, err = pd.getString()
 	if err != nil {
 		return err
 	}
 
-	numTopics, err := pd.GetArrayLength()
+	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err
 	}
 
 	r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
 	for i := 0; i < numTopics; i++ {
-		name, err := pd.GetString()
+		name, err := pd.getString()
 		if err != nil {
 			return err
 		}
 
-		numBlocks, err := pd.GetArrayLength()
+		numBlocks, err := pd.getArrayLength()
 		if err != nil {
 			return err
 		}
@@ -60,13 +57,13 @@ func (r *OffsetFetchResponse) Decode(pd enc.PacketDecoder) (err error) {
 		r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
 
 		for j := 0; j < numBlocks; j++ {
-			id, err := pd.GetInt32()
+			id, err := pd.getInt32()
 			if err != nil {
 				return err
 			}
 
 			block := new(OffsetFetchResponseBlock)
-			err = block.Decode(pd)
+			err = block.decode(pd)
 			if err != nil {
 				return err
 			}

+ 2 - 3
protocol/offset_fetch_response_test.go → offset_fetch_response_test.go

@@ -1,7 +1,6 @@
-package protocol
+package kafka
 
 import "testing"
-import "sarama/types"
 
 var (
 	emptyOffsetFetchResponse = []byte{
@@ -53,7 +52,7 @@ func TestNormalOffsetFetchResponse(t *testing.T) {
 			if response.Blocks["t"][0].Metadata != "md" {
 				t.Error("Decoding produced wrong metadata for topic 't' partition 0.")
 			}
-			if response.Blocks["t"][0].Err != types.REQUEST_TIMED_OUT {
+			if response.Blocks["t"][0].Err != REQUEST_TIMED_OUT {
 				t.Error("Decoding produced wrong error for topic 't' partition 0.")
 			}
 		} else {

+ 13 - 16
protocol/offset_request.go → offset_request.go

@@ -1,7 +1,4 @@
-package protocol
-
-import enc "sarama/encoding"
-import "sarama/types"
+package kafka
 
 // OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64
 // value will be interpreted as milliseconds, or use the special constants defined here.
@@ -16,13 +13,13 @@ const (
 )
 
 type offsetRequestBlock struct {
-	time       types.OffsetTime
+	time       OffsetTime
 	maxOffsets int32
 }
 
-func (r *offsetRequestBlock) Encode(pe enc.PacketEncoder) error {
-	pe.PutInt64(int64(r.time))
-	pe.PutInt32(r.maxOffsets)
+func (r *offsetRequestBlock) encode(pe packetEncoder) error {
+	pe.putInt64(int64(r.time))
+	pe.putInt32(r.maxOffsets)
 	return nil
 }
 
@@ -30,24 +27,24 @@ type OffsetRequest struct {
 	blocks map[string]map[int32]*offsetRequestBlock
 }
 
-func (r *OffsetRequest) Encode(pe enc.PacketEncoder) error {
-	pe.PutInt32(-1) // replica ID is always -1 for clients
-	err := pe.PutArrayLength(len(r.blocks))
+func (r *OffsetRequest) encode(pe packetEncoder) error {
+	pe.putInt32(-1) // replica ID is always -1 for clients
+	err := pe.putArrayLength(len(r.blocks))
 	if err != nil {
 		return err
 	}
 	for topic, partitions := range r.blocks {
-		err = pe.PutString(topic)
+		err = pe.putString(topic)
 		if err != nil {
 			return err
 		}
-		err = pe.PutArrayLength(len(partitions))
+		err = pe.putArrayLength(len(partitions))
 		if err != nil {
 			return err
 		}
 		for partition, block := range partitions {
-			pe.PutInt32(partition)
-			err = block.Encode(pe)
+			pe.putInt32(partition)
+			err = block.encode(pe)
 			if err != nil {
 				return err
 			}
@@ -64,7 +61,7 @@ func (r *OffsetRequest) version() int16 {
 	return 0
 }
 
-func (r *OffsetRequest) AddBlock(topic string, partition_id int32, time types.OffsetTime, maxOffsets int32) {
+func (r *OffsetRequest) AddBlock(topic string, partition_id int32, time OffsetTime, maxOffsets int32) {
 	if r.blocks == nil {
 		r.blocks = make(map[string]map[int32]*offsetRequestBlock)
 	}

+ 1 - 1
protocol/offset_request_test.go → offset_request_test.go

@@ -1,4 +1,4 @@
-package protocol
+package kafka
 
 import "testing"
 

+ 12 - 15
protocol/offset_response.go → offset_response.go

@@ -1,21 +1,18 @@
-package protocol
-
-import enc "sarama/encoding"
-import "sarama/types"
+package kafka
 
 type OffsetResponseBlock struct {
-	Err     types.KError
+	Err     KError
 	Offsets []int64
 }
 
-func (r *OffsetResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
-	tmp, err := pd.GetInt16()
+func (r *OffsetResponseBlock) decode(pd packetDecoder) (err error) {
+	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
 	}
-	r.Err = types.KError(tmp)
+	r.Err = KError(tmp)
 
-	r.Offsets, err = pd.GetInt64Array()
+	r.Offsets, err = pd.getInt64Array()
 
 	return err
 }
@@ -24,20 +21,20 @@ type OffsetResponse struct {
 	Blocks map[string]map[int32]*OffsetResponseBlock
 }
 
-func (r *OffsetResponse) Decode(pd enc.PacketDecoder) (err error) {
-	numTopics, err := pd.GetArrayLength()
+func (r *OffsetResponse) decode(pd packetDecoder) (err error) {
+	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err
 	}
 
 	r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
 	for i := 0; i < numTopics; i++ {
-		name, err := pd.GetString()
+		name, err := pd.getString()
 		if err != nil {
 			return err
 		}
 
-		numBlocks, err := pd.GetArrayLength()
+		numBlocks, err := pd.getArrayLength()
 		if err != nil {
 			return err
 		}
@@ -45,13 +42,13 @@ func (r *OffsetResponse) Decode(pd enc.PacketDecoder) (err error) {
 		r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
 
 		for j := 0; j < numBlocks; j++ {
-			id, err := pd.GetInt32()
+			id, err := pd.getInt32()
 			if err != nil {
 				return err
 			}
 
 			block := new(OffsetResponseBlock)
-			err = block.Decode(pd)
+			err = block.decode(pd)
 			if err != nil {
 				return err
 			}

+ 2 - 3
protocol/offset_response_test.go → offset_response_test.go

@@ -1,7 +1,6 @@
-package protocol
+package kafka
 
 import "testing"
-import "sarama/types"
 
 var (
 	emptyOffsetResponse = []byte{
@@ -41,7 +40,7 @@ func TestNormalOffsetResponse(t *testing.T) {
 		}
 
 		if len(response.Blocks["z"]) == 1 {
-			if response.Blocks["z"][2].Err != types.NO_ERROR {
+			if response.Blocks["z"][2].Err != NO_ERROR {
 				t.Error("Decoding produced invalid error for topic z partition 2.")
 			}
 			if len(response.Blocks["z"][2].Offsets) == 2 {

+ 12 - 16
protocol/produce_request.go → produce_request.go

@@ -1,7 +1,4 @@
-package protocol
-
-import enc "sarama/encoding"
-import "sarama/types"
+package kafka
 
 // RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
 // it must see before responding. Any positive int16 value is valid, or the constants defined here.
@@ -13,37 +10,36 @@ const (
 	WAIT_FOR_ALL   RequiredAcks = -1 // Wait for all replicas to commit before responding.
 )
 
-
 type ProduceRequest struct {
-	RequiredAcks types.RequiredAcks
+	RequiredAcks RequiredAcks
 	Timeout      int32
 	msgSets      map[string]map[int32]*MessageSet
 }
 
-func (p *ProduceRequest) Encode(pe enc.PacketEncoder) error {
-	pe.PutInt16(int16(p.RequiredAcks))
-	pe.PutInt32(p.Timeout)
-	err := pe.PutArrayLength(len(p.msgSets))
+func (p *ProduceRequest) encode(pe packetEncoder) error {
+	pe.putInt16(int16(p.RequiredAcks))
+	pe.putInt32(p.Timeout)
+	err := pe.putArrayLength(len(p.msgSets))
 	if err != nil {
 		return err
 	}
 	for topic, partitions := range p.msgSets {
-		err = pe.PutString(topic)
+		err = pe.putString(topic)
 		if err != nil {
 			return err
 		}
-		err = pe.PutArrayLength(len(partitions))
+		err = pe.putArrayLength(len(partitions))
 		if err != nil {
 			return err
 		}
 		for id, msgSet := range partitions {
-			pe.PutInt32(id)
-			pe.Push(&enc.LengthField{})
-			err = msgSet.Encode(pe)
+			pe.putInt32(id)
+			pe.push(&lengthField{})
+			err = msgSet.encode(pe)
 			if err != nil {
 				return err
 			}
-			err = pe.Pop()
+			err = pe.pop()
 			if err != nil {
 				return err
 			}

+ 2 - 3
protocol/produce_request_test.go → produce_request_test.go

@@ -1,7 +1,6 @@
-package protocol
+package kafka
 
 import "testing"
-import "sarama/types"
 
 var (
 	produceRequestEmpty = []byte{
@@ -41,6 +40,6 @@ func TestProduceRequest(t *testing.T) {
 	request.Timeout = 0x444
 	testEncodable(t, "header", request, produceRequestHeader)
 
-	request.AddMessage("topic", 0xAD, &Message{Codec: types.COMPRESSION_NONE, Key: nil, Value: []byte{0x00, 0xEE}})
+	request.AddMessage("topic", 0xAD, &Message{Codec: COMPRESSION_NONE, Key: nil, Value: []byte{0x00, 0xEE}})
 	testEncodable(t, "one message", request, produceRequestOneMessage)
 }

+ 12 - 15
protocol/produce_response.go → produce_response.go

@@ -1,21 +1,18 @@
-package protocol
-
-import enc "sarama/encoding"
-import "sarama/types"
+package kafka
 
 type ProduceResponseBlock struct {
-	Err    types.KError
+	Err    KError
 	Offset int64
 }
 
-func (pr *ProduceResponseBlock) Decode(pd enc.PacketDecoder) (err error) {
-	tmp, err := pd.GetInt16()
+func (pr *ProduceResponseBlock) decode(pd packetDecoder) (err error) {
+	tmp, err := pd.getInt16()
 	if err != nil {
 		return err
 	}
-	pr.Err = types.KError(tmp)
+	pr.Err = KError(tmp)
 
-	pr.Offset, err = pd.GetInt64()
+	pr.Offset, err = pd.getInt64()
 	if err != nil {
 		return err
 	}
@@ -27,20 +24,20 @@ type ProduceResponse struct {
 	Blocks map[string]map[int32]*ProduceResponseBlock
 }
 
-func (pr *ProduceResponse) Decode(pd enc.PacketDecoder) (err error) {
-	numTopics, err := pd.GetArrayLength()
+func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
+	numTopics, err := pd.getArrayLength()
 	if err != nil {
 		return err
 	}
 
 	pr.Blocks = make(map[string]map[int32]*ProduceResponseBlock, numTopics)
 	for i := 0; i < numTopics; i++ {
-		name, err := pd.GetString()
+		name, err := pd.getString()
 		if err != nil {
 			return err
 		}
 
-		numBlocks, err := pd.GetArrayLength()
+		numBlocks, err := pd.getArrayLength()
 		if err != nil {
 			return err
 		}
@@ -48,13 +45,13 @@ func (pr *ProduceResponse) Decode(pd enc.PacketDecoder) (err error) {
 		pr.Blocks[name] = make(map[int32]*ProduceResponseBlock, numBlocks)
 
 		for j := 0; j < numBlocks; j++ {
-			id, err := pd.GetInt32()
+			id, err := pd.getInt32()
 			if err != nil {
 				return err
 			}
 
 			block := new(ProduceResponseBlock)
-			err = block.Decode(pd)
+			err = block.decode(pd)
 			if err != nil {
 				return err
 			}

+ 3 - 4
protocol/produce_response_test.go → produce_response_test.go

@@ -1,7 +1,6 @@
-package protocol
+package kafka
 
 import "testing"
-import "sarama/types"
 
 var (
 	produceResponseNoBlocks = []byte{
@@ -47,7 +46,7 @@ func TestProduceResponse(t *testing.T) {
 	if block == nil {
 		t.Error("Decoding did not produce a block for bar/1")
 	} else {
-		if block.Err != types.NO_ERROR {
+		if block.Err != NO_ERROR {
 			t.Error("Decoding failed for bar/1/Err, got:", int16(block.Err))
 		}
 		if block.Offset != 0xFF {
@@ -58,7 +57,7 @@ func TestProduceResponse(t *testing.T) {
 	if block == nil {
 		t.Error("Decoding did not produce a block for bar/2")
 	} else {
-		if block.Err != types.INVALID_MESSAGE {
+		if block.Err != INVALID_MESSAGE {
 			t.Error("Decoding failed for bar/2/Err, got:", int16(block.Err))
 		}
 		if block.Offset != 0 {

+ 0 - 9
protocol/errors.go

@@ -1,9 +0,0 @@
-package protocol
-
-import "errors"
-
-// AlreadyConnected is the error returned when calling Connect() on a Broker that is already connected.
-var AlreadyConnected = errors.New("kafka: broker: already connected")
-
-// NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
-var NotConnected = errors.New("kafka: broker: not connected")

+ 0 - 51
protocol/offset_commit_response.go

@@ -1,51 +0,0 @@
-package protocol
-
-import enc "sarama/encoding"
-import "sarama/types"
-
-type OffsetCommitResponse struct {
-	ClientID string
-	Errors   map[string]map[int32]types.KError
-}
-
-func (r *OffsetCommitResponse) Decode(pd enc.PacketDecoder) (err error) {
-	r.ClientID, err = pd.GetString()
-	if err != nil {
-		return err
-	}
-
-	numTopics, err := pd.GetArrayLength()
-	if err != nil {
-		return err
-	}
-
-	r.Errors = make(map[string]map[int32]types.KError, numTopics)
-	for i := 0; i < numTopics; i++ {
-		name, err := pd.GetString()
-		if err != nil {
-			return err
-		}
-
-		numErrors, err := pd.GetArrayLength()
-		if err != nil {
-			return err
-		}
-
-		r.Errors[name] = make(map[int32]types.KError, numErrors)
-
-		for j := 0; j < numErrors; j++ {
-			id, err := pd.GetInt32()
-			if err != nil {
-				return err
-			}
-
-			tmp, err := pd.GetInt16()
-			if err != nil {
-				return err
-			}
-			r.Errors[name][id] = types.KError(tmp)
-		}
-	}
-
-	return nil
-}

+ 0 - 31
protocol/request.go

@@ -1,31 +0,0 @@
-package protocol
-
-import enc "sarama/encoding"
-
-type requestEncoder interface {
-	enc.Encoder
-	key() int16
-	version() int16
-}
-
-type request struct {
-	correlation_id int32
-	id             string
-	body           requestEncoder
-}
-
-func (r *request) Encode(pe enc.PacketEncoder) (err error) {
-	pe.Push(&enc.LengthField{})
-	pe.PutInt16(r.body.key())
-	pe.PutInt16(r.body.version())
-	pe.PutInt32(r.correlation_id)
-	err = pe.PutString(r.id)
-	if err != nil {
-		return err
-	}
-	err = r.body.Encode(pe)
-	if err != nil {
-		return err
-	}
-	return pe.Pop()
-}

+ 0 - 22
protocol/response_header.go

@@ -1,22 +0,0 @@
-package protocol
-
-import "math"
-import enc "sarama/encoding"
-
-type responseHeader struct {
-	length         int32
-	correlation_id int32
-}
-
-func (r *responseHeader) Decode(pd enc.PacketDecoder) (err error) {
-	r.length, err = pd.GetInt32()
-	if err != nil {
-		return err
-	}
-	if r.length <= 4 || r.length > 2*math.MaxUint16 {
-		return enc.DecodingError
-	}
-
-	r.correlation_id, err = pd.GetInt32()
-	return err
-}

+ 29 - 0
request.go

@@ -0,0 +1,29 @@
+package kafka
+
+type requestEncoder interface {
+	encoder
+	key() int16
+	version() int16
+}
+
+type request struct {
+	correlation_id int32
+	id             string
+	body           requestEncoder
+}
+
+func (r *request) encode(pe packetEncoder) (err error) {
+	pe.push(&lengthField{})
+	pe.putInt16(r.body.key())
+	pe.putInt16(r.body.version())
+	pe.putInt32(r.correlation_id)
+	err = pe.putString(r.id)
+	if err != nil {
+		return err
+	}
+	err = r.body.encode(pe)
+	if err != nil {
+		return err
+	}
+	return pe.pop()
+}

+ 7 - 8
protocol/request_test.go → request_test.go

@@ -1,6 +1,5 @@
-package protocol
+package kafka
 
-import enc "sarama/encoding"
 import (
 	"bytes"
 	"testing"
@@ -27,8 +26,8 @@ func (s *testRequestBody) version() int16 {
 	return 0xD2
 }
 
-func (s *testRequestBody) Encode(pe enc.PacketEncoder) error {
-	return pe.PutString("abc")
+func (s *testRequestBody) encode(pe packetEncoder) error {
+	return pe.putString("abc")
 }
 
 func TestRequest(t *testing.T) {
@@ -39,8 +38,8 @@ func TestRequest(t *testing.T) {
 // not specific to request tests, just helper functions for testing structures that
 // implement the encoder or decoder interfaces that needed somewhere to live
 
-func testEncodable(t *testing.T, name string, in enc.Encoder, expect []byte) {
-	packet, err := enc.Encode(in)
+func testEncodable(t *testing.T, name string, in encoder, expect []byte) {
+	packet, err := encode(in)
 	if err != nil {
 		t.Error(err)
 	} else if !bytes.Equal(packet, expect) {
@@ -48,8 +47,8 @@ func testEncodable(t *testing.T, name string, in enc.Encoder, expect []byte) {
 	}
 }
 
-func testDecodable(t *testing.T, name string, out enc.Decoder, in []byte) {
-	err := enc.Decode(in, out)
+func testDecodable(t *testing.T, name string, out decoder, in []byte) {
+	err := decode(in, out)
 	if err != nil {
 		t.Error("Decoding", name, "failed:", err)
 	}

+ 21 - 0
response_header.go

@@ -0,0 +1,21 @@
+package kafka
+
+import "math"
+
+type responseHeader struct {
+	length         int32
+	correlation_id int32
+}
+
+func (r *responseHeader) decode(pd packetDecoder) (err error) {
+	r.length, err = pd.getInt32()
+	if err != nil {
+		return err
+	}
+	if r.length <= 4 || r.length > 2*math.MaxUint16 {
+		return DecodingError
+	}
+
+	r.correlation_id, err = pd.getInt32()
+	return err
+}

+ 1 - 1
protocol/response_header_test.go → response_header_test.go

@@ -1,4 +1,4 @@
-package protocol
+package kafka
 
 import "testing"