ソースを参照

checkpoint wip more package splitting

Evan Huus 12 年 前
コミット
d548e0b95b

+ 1 - 1
protocol/encoder_decoder.go → encoding/encoder_decoder.go

@@ -1,4 +1,4 @@
-package protocol
+package encoding
 
 
 // Kafka Encoding
 // Kafka Encoding
 
 

+ 28 - 0
encoding/errors.go

@@ -0,0 +1,28 @@
+package encoding
+
+import "fmt"
+
+// EncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
+// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
+type EncodingError string
+
+func (err EncodingError) Error() string {
+	return "kafka: Could not encode packet. " + string(err)
+}
+
+// InsufficientData is returned when decoding and the packet is truncated. This can be expected
+// when requesting messages, since as an optimization the server is allowed to return a partial message at the end
+// of the message set.
+type InsufficientData int
+
+func (err InsufficientData) Error() string {
+	return fmt.Sprintf("kafka: Insufficient data to decode packet, at least %d more bytes expected.", int(err))
+}
+
+// DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
+// This can be a bad CRC or length field, or any other invalid value.
+type DecodingError string
+
+func (err DecodingError) Error() string {
+	return "kafka: Could not decode packet. " + string(err)
+}

+ 1 - 1
protocol/packet_crcs.go → encoding/packet_crcs.go

@@ -1,4 +1,4 @@
-package protocol
+package encoding
 
 
 import (
 import (
 	"encoding/binary"
 	"encoding/binary"

+ 1 - 2
protocol/packet_decoder.go → encoding/packet_decoder.go

@@ -1,4 +1,4 @@
-package protocol
+package encoding
 
 
 type packetDecoder interface {
 type packetDecoder interface {
 	remaining() int
 	remaining() int
@@ -15,7 +15,6 @@ type packetDecoder interface {
 	getArrayCount() (int, error)
 	getArrayCount() (int, error)
 
 
 	// misc
 	// misc
-	getError() (KError, error)
 	getString() (string, error)
 	getString() (string, error)
 	getBytes() ([]byte, error)
 	getBytes() ([]byte, error)
 	getSubset(length int) (packetDecoder, error)
 	getSubset(length int) (packetDecoder, error)

+ 1 - 2
protocol/packet_encoder.go → encoding/packet_encoder.go

@@ -1,4 +1,4 @@
-package protocol
+package encoding
 
 
 type packetEncoder interface {
 type packetEncoder interface {
 	// primitives
 	// primitives
@@ -12,7 +12,6 @@ type packetEncoder interface {
 	putArrayCount(in int)
 	putArrayCount(in int)
 
 
 	// misc
 	// misc
-	putError(in KError)
 	putString(in string)
 	putString(in string)
 	putBytes(in []byte)
 	putBytes(in []byte)
 	putRaw(in []byte)
 	putRaw(in []byte)

+ 1 - 1
protocol/packet_lengths.go → encoding/packet_lengths.go

@@ -1,4 +1,4 @@
-package protocol
+package encoding
 
 
 import "encoding/binary"
 import "encoding/binary"
 
 

+ 1 - 5
protocol/prep_encoder.go → encoding/prep_encoder.go

@@ -1,4 +1,4 @@
-package protocol
+package encoding
 
 
 import "math"
 import "math"
 
 
@@ -38,10 +38,6 @@ func (pe *prepEncoder) putArrayCount(in int) {
 
 
 // misc
 // misc
 
 
-func (pe *prepEncoder) putError(in KError) {
-	pe.length += 2
-}
-
 func (pe *prepEncoder) putString(in string) {
 func (pe *prepEncoder) putString(in string) {
 	pe.length += 2
 	pe.length += 2
 	if len(in) > math.MaxInt16 {
 	if len(in) > math.MaxInt16 {

+ 1 - 6
protocol/real_decoder.go → encoding/real_decoder.go

@@ -1,4 +1,4 @@
-package protocol
+package encoding
 
 
 import (
 import (
 	"encoding/binary"
 	"encoding/binary"
@@ -111,11 +111,6 @@ func (rd *realDecoder) getArrayCount() (int, error) {
 
 
 // misc
 // misc
 
 
-func (rd *realDecoder) getError() (KError, error) {
-	val, err := rd.getInt16()
-	return KError(val), err
-}
-
 func (rd *realDecoder) getString() (string, error) {
 func (rd *realDecoder) getString() (string, error) {
 	tmp, err := rd.getInt16()
 	tmp, err := rd.getInt16()
 
 

+ 1 - 5
protocol/real_encoder.go → encoding/real_encoder.go

@@ -1,4 +1,4 @@
-package protocol
+package encoding
 
 
 import "encoding/binary"
 import "encoding/binary"
 
 
@@ -45,10 +45,6 @@ func (re *realEncoder) putArrayCount(in int) {
 
 
 // misc
 // misc
 
 
-func (re *realEncoder) putError(in KError) {
-	re.putInt16(int16(in))
-}
-
 func (re *realEncoder) putString(in string) {
 func (re *realEncoder) putString(in string) {
 	re.putInt16(int16(len(in)))
 	re.putInt16(int16(len(in)))
 	copy(re.raw[re.off:], in)
 	copy(re.raw[re.off:], in)

+ 0 - 25
protocol/encoder_decoder_test.go

@@ -1,25 +0,0 @@
-package protocol
-
-import (
-	"bytes"
-	"testing"
-)
-
-// no actual tests, just helper functions for testing structures that
-// implement the encoder or decoder interfaces
-
-func testEncodable(t *testing.T, name string, in encoder, expect []byte) {
-	packet, err := encode(in)
-	if err != nil {
-		t.Error(err)
-	} else if !bytes.Equal(packet, expect) {
-		t.Error("Encoding", name, "failed\ngot ", packet, "\nwant", expect)
-	}
-}
-
-func testDecodable(t *testing.T, name string, out decoder, in []byte) {
-	err := decode(in, out)
-	if err != nil {
-		t.Error("Decoding", name, "failed:", err)
-	}
-}

+ 1 - 29
protocol/errors.go

@@ -1,9 +1,6 @@
 package protocol
 package protocol
 
 
-import (
-	"errors"
-	"fmt"
-)
+import "errors"
 
 
 // KError is the type of error that can be returned directly by the Kafka broker.
 // KError is the type of error that can be returned directly by the Kafka broker.
 // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
 // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
@@ -68,28 +65,3 @@ var AlreadyConnected = errors.New("kafka: broker: already connected")
 
 
 // NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
 // NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
 var NotConnected = errors.New("kafka: broker: not connected")
 var NotConnected = errors.New("kafka: broker: not connected")
-
-// EncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
-// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
-type EncodingError string
-
-func (err EncodingError) Error() string {
-	return "kafka: Could not encode packet. " + string(err)
-}
-
-// InsufficientData is returned when decoding and the packet is truncated. This can be expected
-// when requesting messages, since as an optimization the server is allowed to return a partial message at the end
-// of the message set.
-type InsufficientData int
-
-func (err InsufficientData) Error() string {
-	return fmt.Sprintf("kafka: Insufficient data to decode packet, at least %d more bytes expected.", int(err))
-}
-
-// DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
-// This can be a bad CRC or length field, or any other invalid value.
-type DecodingError string
-
-func (err DecodingError) Error() string {
-	return "kafka: Could not decode packet. " + string(err)
-}

+ 4 - 12
protocol/message.go

@@ -4,15 +4,7 @@ import (
 	"bytes"
 	"bytes"
 	"compress/gzip"
 	"compress/gzip"
 	"io/ioutil"
 	"io/ioutil"
-)
-
-// The various compression codec recognized by Kafka in messages.
-type CompressionCodec int
-
-const (
-	COMPRESSION_NONE   CompressionCodec = 0
-	COMPRESSION_GZIP   CompressionCodec = 1
-	COMPRESSION_SNAPPY CompressionCodec = 2
+	"sarama/types"
 )
 )
 
 
 // The spec just says: "This is a version id used to allow backwards compatible evolution of the message
 // The spec just says: "This is a version id used to allow backwards compatible evolution of the message
@@ -20,7 +12,7 @@ const (
 const message_format int8 = 0
 const message_format int8 = 0
 
 
 type Message struct {
 type Message struct {
-	Codec CompressionCodec // codec used to compress the message contents
+	Codec types.CompressionCodec // codec used to compress the message contents
 	Key   []byte           // the message key, may be nil
 	Key   []byte           // the message key, may be nil
 	Value []byte           // the message contents
 	Value []byte           // the message contents
 }
 }
@@ -31,7 +23,7 @@ func (m *Message) encode(pe packetEncoder) {
 	pe.putInt8(message_format)
 	pe.putInt8(message_format)
 
 
 	var attributes int8 = 0
 	var attributes int8 = 0
-	attributes |= int8(m.Codec & 0x07)
+	attributes |= m.Codec & 0x07
 	pe.putInt8(attributes)
 	pe.putInt8(attributes)
 
 
 	pe.putBytes(m.Key)
 	pe.putBytes(m.Key)
@@ -74,7 +66,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	m.Codec = CompressionCodec(attribute & 0x07)
+	m.Codec = attribute & 0x07
 
 
 	m.Key, err = pd.getBytes()
 	m.Key, err = pd.getBytes()
 	if err != nil {
 	if err != nil {

+ 0 - 9
protocol/offset_request.go

@@ -1,14 +1,5 @@
 package protocol
 package protocol
 
 
-// Special values accepted by Kafka for the 'time' parameter of OffsetRequest.AddBlock().
-const (
-	// Ask for the latest offsets.
-	LATEST_OFFSETS int64 = -1
-	// Ask for the earliest available offset. Note that because offsets are pulled in descending order,
-	// asking for the earliest offset will always return you a single element.
-	EARLIEST_OFFSET int64 = -2
-)
-
 type offsetRequestBlock struct {
 type offsetRequestBlock struct {
 	time       int64
 	time       int64
 	maxOffsets int32
 	maxOffsets int32

+ 0 - 7
protocol/produce_request.go

@@ -1,12 +1,5 @@
 package protocol
 package protocol
 
 
-// Special values accepted by Kafka for the RequiredAcks member of produce requests.
-const (
-	NO_RESPONSE    int16 = 0  // Don't send any response, the TCP ACK is all you get.
-	WAIT_FOR_LOCAL int16 = 1  // Wait for only the local commit to succeed before responding.
-	WAIT_FOR_ALL   int16 = -1 // Wait for all replicas to commit before responding.
-)
-
 type ProduceRequest struct {
 type ProduceRequest struct {
 	RequiredAcks int16
 	RequiredAcks int16
 	Timeout      int32
 	Timeout      int32

+ 23 - 1
protocol/request_test.go

@@ -1,6 +1,9 @@
 package protocol
 package protocol
 
 
-import "testing"
+import (
+	"bytes"
+	"testing"
+)
 
 
 var (
 var (
 	requestSimple = []byte{
 	requestSimple = []byte{
@@ -31,3 +34,22 @@ func TestRequest(t *testing.T) {
 	request := request{correlation_id: 0x1234, id: "myClient", body: new(testRequestBody)}
 	request := request{correlation_id: 0x1234, id: "myClient", body: new(testRequestBody)}
 	testEncodable(t, "simple", &request, requestSimple)
 	testEncodable(t, "simple", &request, requestSimple)
 }
 }
+
+// not specific to request tests, just helper functions for testing structures that
+// implement the encoder or decoder interfaces that needed somewhere to live
+
+func testEncodable(t *testing.T, name string, in encoder, expect []byte) {
+	packet, err := encode(in)
+	if err != nil {
+		t.Error(err)
+	} else if !bytes.Equal(packet, expect) {
+		t.Error("Encoding", name, "failed\ngot ", packet, "\nwant", expect)
+	}
+}
+
+func testDecodable(t *testing.T, name string, out decoder, in []byte) {
+	err := decode(in, out)
+	if err != nil {
+		t.Error("Decoding", name, "failed:", err)
+	}
+}

+ 94 - 0
types/types.go

@@ -0,0 +1,94 @@
+/*
+Package types provides access to the types and constants that the Kafka protocol uses,
+since they may be needed by all levels of the saramago stack.
+*/
+package types
+
+// KError is the type of error that can be returned directly by the Kafka broker.
+// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
+type KError int16
+
+const (
+	NO_ERROR                    KError = 0
+	UNKNOWN                     KError = -1
+	OFFSET_OUT_OF_RANGE         KError = 1
+	INVALID_MESSAGE             KError = 2
+	UNKNOWN_TOPIC_OR_PARTITION  KError = 3
+	INVALID_MESSAGE_SIZE        KError = 4
+	LEADER_NOT_AVAILABLE        KError = 5
+	NOT_LEADER_FOR_PARTITION    KError = 6
+	REQUEST_TIMED_OUT           KError = 7
+	BROKER_NOT_AVAILABLE        KError = 8
+	REPLICA_NOT_AVAILABLE       KError = 9
+	MESSAGE_SIZE_TOO_LARGE      KError = 10
+	STALE_CONTROLLER_EPOCH_CODE KError = 11
+	OFFSET_METADATA_TOO_LARGE   KError = 12
+)
+
+func (err KError) Error() string {
+	// Error messages stolen/adapted from
+	// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
+	switch err {
+	case NO_ERROR:
+		return "kafka server: Not an error, why are you printing me?"
+	case UNKNOWN:
+		return "kafka server: Unexpected (unknown?) server error."
+	case OFFSET_OUT_OF_RANGE:
+		return "kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition."
+	case INVALID_MESSAGE:
+		return "kafka server: Message contents does not match its CRC."
+	case UNKNOWN_TOPIC_OR_PARTITION:
+		return "kafka server: Request was for a topic or partition that does not exist on this broker."
+	case INVALID_MESSAGE_SIZE:
+		return "kafka server: The message has a negative size."
+	case LEADER_NOT_AVAILABLE:
+		return "kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes."
+	case NOT_LEADER_FOR_PARTITION:
+		return "kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date."
+	case REQUEST_TIMED_OUT:
+		return "kafka server: Request exceeded the user-specified time limit in the request."
+	case BROKER_NOT_AVAILABLE:
+		return "kafka server: Broker not available. Not a client facing error, we should never receive this!!!"
+	case REPLICA_NOT_AVAILABLE:
+		return "kafka server: Replica not available. What is the difference between this and LeaderNotAvailable?"
+	case MESSAGE_SIZE_TOO_LARGE:
+		return "kafka server: Message was too large, server rejected it to avoid allocation error."
+	case STALE_CONTROLLER_EPOCH_CODE:
+		return "kafka server: Stale controller epoch code. ???"
+	case OFFSET_METADATA_TOO_LARGE:
+		return "kafka server: Specified a string larger than the configured maximum for offset metadata."
+	default:
+		return "Unknown error, how did this happen?"
+	}
+}
+
+// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
+type CompressionCodec int8
+
+const (
+	COMPRESSION_NONE   CompressionCodec = 0
+	COMPRESSION_GZIP   CompressionCodec = 1
+	COMPRESSION_SNAPPY CompressionCodec = 2
+)
+
+// RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
+// it must see before responding. Any positive int16 value is valid, or the constants defined here.
+type RequiredAcks int16
+
+const (
+	NO_RESPONSE    RequiredAcks = 0  // Don't send any response, the TCP ACK is all you get.
+	WAIT_FOR_LOCAL RequiredAcks = 1  // Wait for only the local commit to succeed before responding.
+	WAIT_FOR_ALL   RequiredAcks = -1 // Wait for all replicas to commit before responding.
+)
+
+// OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64
+// value will be interpreted as milliseconds, or use the special constants defined here.
+type OffsetTime int64
+
+const (
+	// Ask for the latest offsets.
+	LATEST_OFFSETS OffsetTime = -1
+	// Ask for the earliest available offset. Note that because offsets are pulled in descending order,
+	// asking for the earliest offset will always return you a single element.
+	EARLIEST_OFFSET OffsetTime = -2
+)