소스 검색

wip remove namespacing

Evan Huus 12 년 전
부모
커밋
313b615938
8개의 변경된 파일220개의 추가작업 그리고 248개의 파일을 삭제
  1. 59 0
      errors.go
  2. 0 145
      mock/broker.go
  3. 0 9
      mock/broker_test.go
  4. 128 0
      protocol/broker_test.go
  5. 10 0
      protocol/message.go
  6. 12 0
      protocol/offset_request.go
  7. 11 0
      protocol/produce_request.go
  8. 0 94
      types/types.go

+ 59 - 0
errors.go

@@ -14,3 +14,62 @@ var InsufficientData = errors.New("kafka: Insufficient data to decode packet, mo
 // DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
 // This can be a bad CRC or length field, or any other invalid value.
 var DecodingError = errors.New("kafka: Error while decoding packet.")
+
+// KError is the type of error that can be returned directly by the Kafka broker.
+// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
+type KError int16
+
+const (
+	NO_ERROR                    KError = 0
+	UNKNOWN                     KError = -1
+	OFFSET_OUT_OF_RANGE         KError = 1
+	INVALID_MESSAGE             KError = 2
+	UNKNOWN_TOPIC_OR_PARTITION  KError = 3
+	INVALID_MESSAGE_SIZE        KError = 4
+	LEADER_NOT_AVAILABLE        KError = 5
+	NOT_LEADER_FOR_PARTITION    KError = 6
+	REQUEST_TIMED_OUT           KError = 7
+	BROKER_NOT_AVAILABLE        KError = 8
+	REPLICA_NOT_AVAILABLE       KError = 9
+	MESSAGE_SIZE_TOO_LARGE      KError = 10
+	STALE_CONTROLLER_EPOCH_CODE KError = 11
+	OFFSET_METADATA_TOO_LARGE   KError = 12
+)
+
+func (err KError) Error() string {
+	// Error messages stolen/adapted from
+	// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
+	switch err {
+	case NO_ERROR:
+		return "kafka server: Not an error, why are you printing me?"
+	case UNKNOWN:
+		return "kafka server: Unexpected (unknown?) server error."
+	case OFFSET_OUT_OF_RANGE:
+		return "kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition."
+	case INVALID_MESSAGE:
+		return "kafka server: Message contents does not match its CRC."
+	case UNKNOWN_TOPIC_OR_PARTITION:
+		return "kafka server: Request was for a topic or partition that does not exist on this broker."
+	case INVALID_MESSAGE_SIZE:
+		return "kafka server: The message has a negative size."
+	case LEADER_NOT_AVAILABLE:
+		return "kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes."
+	case NOT_LEADER_FOR_PARTITION:
+		return "kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date."
+	case REQUEST_TIMED_OUT:
+		return "kafka server: Request exceeded the user-specified time limit in the request."
+	case BROKER_NOT_AVAILABLE:
+		return "kafka server: Broker not available. Not a client facing error, we should never receive this!!!"
+	case REPLICA_NOT_AVAILABLE:
+		return "kafka server: Replica not available. What is the difference between this and LeaderNotAvailable?"
+	case MESSAGE_SIZE_TOO_LARGE:
+		return "kafka server: Message was too large, server rejected it to avoid allocation error."
+	case STALE_CONTROLLER_EPOCH_CODE:
+		return "kafka server: Stale controller epoch code. ???"
+	case OFFSET_METADATA_TOO_LARGE:
+		return "kafka server: Specified a string larger than the configured maximum for offset metadata."
+	default:
+		return "Unknown error, how did this happen?"
+	}
+}
+

+ 0 - 145
mock/broker.go

@@ -1,145 +0,0 @@
-/*
-Package mock defines some simple helper functions for mocking Kafka brokers.
-
-It exists solely for testing other parts of the Sarama stack. It is in its own
-package so that it can be imported by tests in multiple different packages.
-*/
-package mock
-
-import (
-	"encoding/binary"
-	"io"
-	"net"
-	"strconv"
-	"testing"
-	"time"
-)
-
-// Broker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
-// accepts a single connection. It reads Kafka requests from that connection and returns each response
-// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
-// the server sleeps for 250ms instead of reading a request).
-//
-// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
-// waiting for a response, the test panics.
-//
-// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
-// automatically as a convenience.
-type Broker struct {
-	port      int32
-	stopper   chan bool
-	responses chan []byte
-	listener  net.Listener
-	t         *testing.T
-}
-
-// Port is the kernel-select TCP port the broker is listening on.
-func (b *Broker) Port() int32 {
-	return b.port
-}
-
-// Close closes the response channel originally provided, then waits to make sure
-// that all requests/responses matched up before exiting.
-func (b *Broker) Close() {
-	close(b.responses)
-	<-b.stopper
-}
-
-func (b *Broker) serverLoop() {
-	defer close(b.stopper)
-	conn, err := b.listener.Accept()
-	if err != nil {
-		b.t.Error(err)
-		conn.Close()
-		b.listener.Close()
-		return
-	}
-	reqHeader := make([]byte, 4)
-	resHeader := make([]byte, 8)
-	for response := range b.responses {
-		if response == nil {
-			time.Sleep(250 * time.Millisecond)
-			continue
-		}
-		_, err := io.ReadFull(conn, reqHeader)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		body := make([]byte, binary.BigEndian.Uint32(reqHeader))
-		if len(body) < 10 {
-			b.t.Error("Kafka request too short.")
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		_, err = io.ReadFull(conn, body)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		if len(response) == 0 {
-			continue
-		}
-		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
-		binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
-		_, err = conn.Write(resHeader)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-		_, err = conn.Write(response)
-		if err != nil {
-			b.t.Error(err)
-			conn.Close()
-			b.listener.Close()
-			return
-		}
-	}
-	err = conn.Close()
-	if err != nil {
-		b.t.Error(err)
-		b.listener.Close()
-		return
-	}
-	err = b.listener.Close()
-	if err != nil {
-		b.t.Error(err)
-		return
-	}
-}
-
-// NewBroker launches a fake Kafka broker. It takes a testing.T as provided by the test framework and a channel of responses to use.
-// If an error occurs it is simply logged to the testing.T and the broker exits.
-func NewBroker(t *testing.T, responses chan []byte) *Broker {
-	var err error
-
-	broker := new(Broker)
-	broker.stopper = make(chan bool)
-	broker.responses = responses
-	broker.t = t
-
-	broker.listener, err = net.Listen("tcp", "localhost:0")
-	if err != nil {
-		t.Fatal(err)
-	}
-	_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
-	if err != nil {
-		t.Fatal(err)
-	}
-	tmp, err := strconv.ParseInt(portStr, 10, 32)
-	if err != nil {
-		t.Fatal(err)
-	}
-	broker.port = int32(tmp)
-
-	go broker.serverLoop()
-
-	return broker
-}

+ 0 - 9
mock/broker_test.go

@@ -1,9 +0,0 @@
-package mock
-
-import "testing"
-
-func ExampleBroker(t *testing.T) {
-	responses := make(chan []byte)
-	broker := NewBroker(t, responses)
-	defer broker.Close()
-}

+ 128 - 0
protocol/broker_test.go

@@ -6,6 +6,134 @@ import (
 	"sarama/types"
 	"testing"
 )
+// Broker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
+// accepts a single connection. It reads Kafka requests from that connection and returns each response
+// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
+// the server sleeps for 250ms instead of reading a request).
+//
+// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
+// waiting for a response, the test panics.
+//
+// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
+// automatically as a convenience.
+type Broker struct {
+	port      int32
+	stopper   chan bool
+	responses chan []byte
+	listener  net.Listener
+	t         *testing.T
+}
+
+// Port is the kernel-select TCP port the broker is listening on.
+func (b *Broker) Port() int32 {
+	return b.port
+}
+
+// Close closes the response channel originally provided, then waits to make sure
+// that all requests/responses matched up before exiting.
+func (b *Broker) Close() {
+	close(b.responses)
+	<-b.stopper
+}
+
+func (b *Broker) serverLoop() {
+	defer close(b.stopper)
+	conn, err := b.listener.Accept()
+	if err != nil {
+		b.t.Error(err)
+		conn.Close()
+		b.listener.Close()
+		return
+	}
+	reqHeader := make([]byte, 4)
+	resHeader := make([]byte, 8)
+	for response := range b.responses {
+		if response == nil {
+			time.Sleep(250 * time.Millisecond)
+			continue
+		}
+		_, err := io.ReadFull(conn, reqHeader)
+		if err != nil {
+			b.t.Error(err)
+			conn.Close()
+			b.listener.Close()
+			return
+		}
+		body := make([]byte, binary.BigEndian.Uint32(reqHeader))
+		if len(body) < 10 {
+			b.t.Error("Kafka request too short.")
+			conn.Close()
+			b.listener.Close()
+			return
+		}
+		_, err = io.ReadFull(conn, body)
+		if err != nil {
+			b.t.Error(err)
+			conn.Close()
+			b.listener.Close()
+			return
+		}
+		if len(response) == 0 {
+			continue
+		}
+		binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
+		binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
+		_, err = conn.Write(resHeader)
+		if err != nil {
+			b.t.Error(err)
+			conn.Close()
+			b.listener.Close()
+			return
+		}
+		_, err = conn.Write(response)
+		if err != nil {
+			b.t.Error(err)
+			conn.Close()
+			b.listener.Close()
+			return
+		}
+	}
+	err = conn.Close()
+	if err != nil {
+		b.t.Error(err)
+		b.listener.Close()
+		return
+	}
+	err = b.listener.Close()
+	if err != nil {
+		b.t.Error(err)
+		return
+	}
+}
+
+// NewBroker launches a fake Kafka broker. It takes a testing.T as provided by the test framework and a channel of responses to use.
+// If an error occurs it is simply logged to the testing.T and the broker exits.
+func NewBroker(t *testing.T, responses chan []byte) *Broker {
+	var err error
+
+	broker := new(Broker)
+	broker.stopper = make(chan bool)
+	broker.responses = responses
+	broker.t = t
+
+	broker.listener, err = net.Listen("tcp", "localhost:0")
+	if err != nil {
+		t.Fatal(err)
+	}
+	_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
+	if err != nil {
+		t.Fatal(err)
+	}
+	tmp, err := strconv.ParseInt(portStr, 10, 32)
+	if err != nil {
+		t.Fatal(err)
+	}
+	broker.port = int32(tmp)
+
+	go broker.serverLoop()
+
+	return broker
+}
 
 func ExampleBroker() error {
 	broker := NewBroker("localhost", 9092)

+ 10 - 0
protocol/message.go

@@ -8,6 +8,16 @@ import (
 	"sarama/types"
 )
 
+// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
+type CompressionCodec int8
+
+const (
+	COMPRESSION_NONE   CompressionCodec = 0
+	COMPRESSION_GZIP   CompressionCodec = 1
+	COMPRESSION_SNAPPY CompressionCodec = 2
+)
+
+
 // The spec just says: "This is a version id used to allow backwards compatible evolution of the message
 // binary format." but it doesn't say what the current value is, so presumably 0...
 const message_format int8 = 0

+ 12 - 0
protocol/offset_request.go

@@ -3,6 +3,18 @@ package protocol
 import enc "sarama/encoding"
 import "sarama/types"
 
+// OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64
+// value will be interpreted as milliseconds, or use the special constants defined here.
+type OffsetTime int64
+
+const (
+	// Ask for the latest offsets.
+	LATEST_OFFSETS OffsetTime = -1
+	// Ask for the earliest available offset. Note that because offsets are pulled in descending order,
+	// asking for the earliest offset will always return you a single element.
+	EARLIEST_OFFSET OffsetTime = -2
+)
+
 type offsetRequestBlock struct {
 	time       types.OffsetTime
 	maxOffsets int32

+ 11 - 0
protocol/produce_request.go

@@ -3,6 +3,17 @@ package protocol
 import enc "sarama/encoding"
 import "sarama/types"
 
+// RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
+// it must see before responding. Any positive int16 value is valid, or the constants defined here.
+type RequiredAcks int16
+
+const (
+	NO_RESPONSE    RequiredAcks = 0  // Don't send any response, the TCP ACK is all you get.
+	WAIT_FOR_LOCAL RequiredAcks = 1  // Wait for only the local commit to succeed before responding.
+	WAIT_FOR_ALL   RequiredAcks = -1 // Wait for all replicas to commit before responding.
+)
+
+
 type ProduceRequest struct {
 	RequiredAcks types.RequiredAcks
 	Timeout      int32

+ 0 - 94
types/types.go

@@ -1,94 +0,0 @@
-/*
-Package types provides access to the types and constants that the Kafka protocol uses,
-since they are needed by all levels of the saramago stack.
-*/
-package types
-
-// KError is the type of error that can be returned directly by the Kafka broker.
-// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
-type KError int16
-
-const (
-	NO_ERROR                    KError = 0
-	UNKNOWN                     KError = -1
-	OFFSET_OUT_OF_RANGE         KError = 1
-	INVALID_MESSAGE             KError = 2
-	UNKNOWN_TOPIC_OR_PARTITION  KError = 3
-	INVALID_MESSAGE_SIZE        KError = 4
-	LEADER_NOT_AVAILABLE        KError = 5
-	NOT_LEADER_FOR_PARTITION    KError = 6
-	REQUEST_TIMED_OUT           KError = 7
-	BROKER_NOT_AVAILABLE        KError = 8
-	REPLICA_NOT_AVAILABLE       KError = 9
-	MESSAGE_SIZE_TOO_LARGE      KError = 10
-	STALE_CONTROLLER_EPOCH_CODE KError = 11
-	OFFSET_METADATA_TOO_LARGE   KError = 12
-)
-
-func (err KError) Error() string {
-	// Error messages stolen/adapted from
-	// https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
-	switch err {
-	case NO_ERROR:
-		return "kafka server: Not an error, why are you printing me?"
-	case UNKNOWN:
-		return "kafka server: Unexpected (unknown?) server error."
-	case OFFSET_OUT_OF_RANGE:
-		return "kafka server: The requested offset is outside the range of offsets maintained by the server for the given topic/partition."
-	case INVALID_MESSAGE:
-		return "kafka server: Message contents does not match its CRC."
-	case UNKNOWN_TOPIC_OR_PARTITION:
-		return "kafka server: Request was for a topic or partition that does not exist on this broker."
-	case INVALID_MESSAGE_SIZE:
-		return "kafka server: The message has a negative size."
-	case LEADER_NOT_AVAILABLE:
-		return "kafka server: In the middle of a leadership election, there is currently no leader for this partition and hence it is unavailable for writes."
-	case NOT_LEADER_FOR_PARTITION:
-		return "kafka server: Tried to send a message to a replica that is not the leader for some partition. Your metadata is out of date."
-	case REQUEST_TIMED_OUT:
-		return "kafka server: Request exceeded the user-specified time limit in the request."
-	case BROKER_NOT_AVAILABLE:
-		return "kafka server: Broker not available. Not a client facing error, we should never receive this!!!"
-	case REPLICA_NOT_AVAILABLE:
-		return "kafka server: Replica not available. What is the difference between this and LeaderNotAvailable?"
-	case MESSAGE_SIZE_TOO_LARGE:
-		return "kafka server: Message was too large, server rejected it to avoid allocation error."
-	case STALE_CONTROLLER_EPOCH_CODE:
-		return "kafka server: Stale controller epoch code. ???"
-	case OFFSET_METADATA_TOO_LARGE:
-		return "kafka server: Specified a string larger than the configured maximum for offset metadata."
-	default:
-		return "Unknown error, how did this happen?"
-	}
-}
-
-// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
-type CompressionCodec int8
-
-const (
-	COMPRESSION_NONE   CompressionCodec = 0
-	COMPRESSION_GZIP   CompressionCodec = 1
-	COMPRESSION_SNAPPY CompressionCodec = 2
-)
-
-// RequiredAcks is used in Produce Requests to tell the broker how many replica acknowledgements
-// it must see before responding. Any positive int16 value is valid, or the constants defined here.
-type RequiredAcks int16
-
-const (
-	NO_RESPONSE    RequiredAcks = 0  // Don't send any response, the TCP ACK is all you get.
-	WAIT_FOR_LOCAL RequiredAcks = 1  // Wait for only the local commit to succeed before responding.
-	WAIT_FOR_ALL   RequiredAcks = -1 // Wait for all replicas to commit before responding.
-)
-
-// OffsetTime is used in Offset Requests to ask for all messages before a certain time. Any positive int64
-// value will be interpreted as milliseconds, or use the special constants defined here.
-type OffsetTime int64
-
-const (
-	// Ask for the latest offsets.
-	LATEST_OFFSETS OffsetTime = -1
-	// Ask for the earliest available offset. Note that because offsets are pulled in descending order,
-	// asking for the earliest offset will always return you a single element.
-	EARLIEST_OFFSET OffsetTime = -2
-)