Przeglądaj źródła

misc documentation

Evan Huus 12 lat temu
rodzic
commit
2fa26a24ae

+ 5 - 0
kafka/client.go

@@ -1,3 +1,8 @@
+/*
+Package kafka provides a high-level API for writing Kafka 0.8 clients.
+
+It is built strictly on sister package sarama/protocol.
+*/
 package kafka
 
 import k "sarama/protocol"

+ 0 - 4
kafka/kafka.go

@@ -1,4 +0,0 @@
-/*
-High-level bindings for Kafka 0.8 protocol.
-*/
-package kafka

+ 1 - 1
kafka/producer.go

@@ -74,7 +74,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 		return err
 	}
 
-	request := &k.ProduceRequest{ResponseCondition: k.WAIT_FOR_LOCAL, Timeout: 0}
+	request := &k.ProduceRequest{RequiredAcks: k.WAIT_FOR_LOCAL, Timeout: 0}
 	request.AddMessage(p.topic, partition, &k.Message{Key: keyBytes, Value: valBytes})
 
 	response, err := broker.Produce(p.client.id, request)

+ 6 - 7
protocol/broker.go

@@ -6,7 +6,7 @@ import (
 	"sync"
 )
 
-// A single Kafka broker. All operations on this object are entirely concurrency-safe.
+// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
 type Broker struct {
 	id   int32
 	host string
@@ -26,7 +26,7 @@ type responsePromise struct {
 	errors         chan error
 }
 
-// Creates and returns a Broker targetting the given host:port address.
+// NewBroker creates and returns a Broker targetting the given host:port address.
 // This does not attempt to actually connect, you have to call Connect() for that.
 func NewBroker(host string, port int32) *Broker {
 	b := new(Broker)
@@ -36,7 +36,6 @@ func NewBroker(host string, port int32) *Broker {
 	return b
 }
 
-// Opens a connection to the remote broker.
 func (b *Broker) Connect() error {
 	b.lock.Lock()
 	defer b.lock.Unlock()
@@ -65,7 +64,6 @@ func (b *Broker) Connect() error {
 	return nil
 }
 
-// Closes the connection to the remote broker.
 func (b *Broker) Close() error {
 	b.lock.Lock()
 	defer b.lock.Unlock()
@@ -86,12 +84,13 @@ func (b *Broker) Close() error {
 	return err
 }
 
-// Returns the broker ID from Kafka, or -1 if that is not known.
+// ID returns the broker ID retrieved from Kafka's metadata, or -1 if that is not known.
 func (b *Broker) ID() int32 {
 	return b.id
 }
 
-// Two brokers are equal if they have the same host, port, and id.
+// Equals compares two brokers. Two brokers are considered equal if they have the same host, port, and id,
+// or if they are both nil.
 func (b *Broker) Equals(a *Broker) bool {
 	switch {
 	case a == nil && b == nil:
@@ -130,7 +129,7 @@ func (b *Broker) Produce(clientID string, request *ProduceRequest) (*ProduceResp
 	var response *ProduceResponse
 	var err error
 
-	if request.ResponseCondition == NO_RESPONSE {
+	if request.RequiredAcks == NO_RESPONSE {
 		err = b.sendAndReceive(clientID, request, nil)
 	} else {
 		response = new(ProduceResponse)

+ 23 - 0
protocol/doc.go

@@ -0,0 +1,23 @@
+/*
+Package protocol provides the low-level primitives necessary for communicating with a Kafka 0.8 cluster.
+
+The core of the package is the Broker. It represents a connection to a single Kafka broker service, and
+has methods for querying the broker.
+
+The other types are mostly Request types or Response types. Most of the Broker methods take a Request of a
+specific type and return a Response of the appropriate type, for example:
+
+	broker := NewBroker("localhost", 9092)
+	err := broker.Connect()
+	if err != nil {
+		return err
+	}
+
+	request := MetadataRequest{Topics:[]string{"myTopic"}}
+	response, err := broker.GetMetadata("myClient", request)
+
+	// do things with response
+
+	broker.Close()
+*/
+package protocol

+ 7 - 8
protocol/errors.go

@@ -5,7 +5,7 @@ import (
 	"fmt"
 )
 
-// The various errors that can be returned by the Kafka server.
+// KError is the type of error that can be returned directly by the Kafka broker.
 // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
 type KError int16
 
@@ -63,15 +63,14 @@ func (err KError) Error() string {
 	}
 }
 
-// Error returned when calling Connect() on a Broker that is already connected.
+// AlreadyConnected is the error returned when calling Connect() on a Broker that is already connected.
 var AlreadyConnected = errors.New("kafka: broker: already connected")
 
-// Error returned when trying to send or call Close() on a Broker that is not connected.
+// NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.
 var NotConnected = errors.New("kafka: broker: not connected")
 
-// Returned when there was an error encoding a Kafka packet. This can happen, for example,
-// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do
-// not permit that.
+// EncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example,
+// if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that.
 type EncodingError string
 
 func (err EncodingError) Error() string {
@@ -87,8 +86,8 @@ func (err InsufficientData) Error() string {
 	return fmt.Sprintf("kafka: Insufficient data to decode packet, at least %d more bytes expected.", int(err))
 }
 
-// Returned when there was an error decoding the Kafka server's response. Usually means that you've
-// connected to the wrong address.
+// DecodingError is returned when there was an error (other than truncated data) decoding the Kafka broker's response.
+// This can be a bad CRC or length field, or any other invalid value.
 type DecodingError string
 
 func (err DecodingError) Error() string {

+ 1 - 7
protocol/message_set.go

@@ -38,7 +38,7 @@ func (msb *MessageBlock) decode(pd packetDecoder) (err error) {
 }
 
 type MessageSet struct {
-	PartialTrailingMessage bool
+	PartialTrailingMessage bool // whether the set on the wire contained an incomplete trailing MessageBlock
 	Messages               []*MessageBlock
 }
 
@@ -75,9 +75,3 @@ func (ms *MessageSet) addMessage(msg *Message) {
 	block.Msg = msg
 	ms.Messages = append(ms.Messages, block)
 }
-
-func newMessageSet() *MessageSet {
-	set := new(MessageSet)
-	set.Messages = make([]*MessageBlock, 0)
-	return set
-}

+ 6 - 6
protocol/produce_request.go

@@ -1,6 +1,6 @@
 package protocol
 
-// Special values accepted by Kafka for the ResponseCondition member of produce requests.
+// Special values accepted by Kafka for the RequiredAcks member of produce requests.
 const (
 	NO_RESPONSE    int16 = 0  // Don't send any response, the TCP ACK is all you get.
 	WAIT_FOR_LOCAL int16 = 1  // Wait for only the local commit to succeed before responding.
@@ -8,13 +8,13 @@ const (
 )
 
 type ProduceRequest struct {
-	ResponseCondition int16
-	Timeout           int32
-	msgSets           map[string]map[int32]*MessageSet
+	RequiredAcks int16
+	Timeout      int32
+	msgSets      map[string]map[int32]*MessageSet
 }
 
 func (p *ProduceRequest) encode(pe packetEncoder) {
-	pe.putInt16(p.ResponseCondition)
+	pe.putInt16(p.RequiredAcks)
 	pe.putInt32(p.Timeout)
 	pe.putArrayCount(len(p.msgSets))
 	for topic, partitions := range p.msgSets {
@@ -49,7 +49,7 @@ func (p *ProduceRequest) AddMessage(topic string, partition int32, msg *Message)
 	set := p.msgSets[topic][partition]
 
 	if set == nil {
-		set = newMessageSet()
+		set = new(MessageSet)
 		p.msgSets[topic][partition] = set
 	}
 

+ 1 - 1
protocol/produce_request_test.go

@@ -36,7 +36,7 @@ func TestProduceRequest(t *testing.T) {
 	request := new(ProduceRequest)
 	testEncodable(t, "empty", request, produceRequestEmpty)
 
-	request.ResponseCondition = 0x123
+	request.RequiredAcks = 0x123
 	request.Timeout = 0x444
 	testEncodable(t, "header", request, produceRequestHeader)
 

+ 0 - 4
protocol/protocol.go

@@ -1,4 +0,0 @@
-/*
-TODO
-*/
-package protocol

+ 2 - 2
sarama.go

@@ -1,8 +1,8 @@
 /*
 Package sarama provides client libraries for the Kafka 0.8 protocol.
 
-You probably want "sarama/kafka" which contains the high-level bindings.
+Package sarama is a dummy package, you almost certainly want sarama/kafka instead, which contains the high-level userspace API.
 
-Alternatively, "sarama/protocol" contains a lower-level API for more precise control.
+If not, sarama/protocol contains the low-level API that gives you exact control over what goes on the wire.
 */
 package sarama