Browse Source

Split into two packages, one low-level, one high

Evan Huus 11 years ago
parent
commit
9047579808

+ 5 - 3
client.go → kafka/client.go

@@ -1,5 +1,7 @@
 package kafka
 
+import k "sarama/protocol"
+
 type Client struct {
 	id    *string
 	cache *metadataCache
@@ -15,7 +17,7 @@ func NewClient(id *string, host string, port int32) (client *Client, err error)
 	return client, nil
 }
 
-func (client *Client) leader(topic string, partition_id int32) (*Broker, error) {
+func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
 	leader := client.cache.leader(topic, partition_id)
 
 	if leader == nil {
@@ -28,7 +30,7 @@ func (client *Client) leader(topic string, partition_id int32) (*Broker, error)
 	}
 
 	if leader == nil {
-		return nil, UNKNOWN_TOPIC_OR_PARTITION
+		return nil, k.UNKNOWN_TOPIC_OR_PARTITION
 	}
 
 	return leader, nil
@@ -47,7 +49,7 @@ func (client *Client) partitions(topic string) ([]int32, error) {
 	}
 
 	if partitions == nil {
-		return nil, UNKNOWN_TOPIC_OR_PARTITION
+		return nil, k.UNKNOWN_TOPIC_OR_PARTITION
 	}
 
 	return partitions, nil

+ 0 - 0
consumer.go → kafka/consumer.go


+ 7 - 0
kafka/encoder.go

@@ -0,0 +1,7 @@
+package kafka
+
+// A simple interface for any type that can be encoded as an array of bytes
+// in order to be sent as the key or value of a Kafka message.
+type Encoder interface {
+	Encode() ([]byte, error)
+}

+ 4 - 0
kafka/kafka.go

@@ -0,0 +1,4 @@
+/*
+High-level bindings for Kafka 0.8 protocol.
+*/
+package kafka

+ 14 - 12
metadata_cache.go → kafka/metadata_cache.go

@@ -1,5 +1,7 @@
 package kafka
 
+import k "sarama/protocol"
+
 import (
 	"sort"
 	"sync"
@@ -7,7 +9,7 @@ import (
 
 type metadataCache struct {
 	client  *Client
-	brokers map[int32]*Broker          // maps broker ids to brokers
+	brokers map[int32]*k.Broker        // maps broker ids to brokers
 	leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
 	lock    sync.RWMutex               // protects access to the maps, only one since they're always accessed together
 }
@@ -15,14 +17,14 @@ type metadataCache struct {
 func newMetadataCache(client *Client, host string, port int32) (*metadataCache, error) {
 	mc := new(metadataCache)
 
-	starter := NewBroker(host, port)
+	starter := k.NewBroker(host, port)
 	err := starter.Connect()
 	if err != nil {
 		return nil, err
 	}
 
 	mc.client = client
-	mc.brokers = make(map[int32]*Broker)
+	mc.brokers = make(map[int32]*k.Broker)
 	mc.leaders = make(map[string]map[int32]int32)
 
 	mc.brokers[starter.ID()] = starter
@@ -36,7 +38,7 @@ func newMetadataCache(client *Client, host string, port int32) (*metadataCache,
 	return mc, nil
 }
 
-func (mc *metadataCache) removeBroker(broker *Broker) {
+func (mc *metadataCache) removeBroker(broker *k.Broker) {
 	if broker == nil {
 		return
 	}
@@ -48,7 +50,7 @@ func (mc *metadataCache) removeBroker(broker *Broker) {
 	go broker.Close()
 }
 
-func (mc *metadataCache) leader(topic string, partition_id int32) *Broker {
+func (mc *metadataCache) leader(topic string, partition_id int32) *k.Broker {
 	mc.lock.RLock()
 	defer mc.lock.RUnlock()
 
@@ -65,7 +67,7 @@ func (mc *metadataCache) leader(topic string, partition_id int32) *Broker {
 	return nil
 }
 
-func (mc *metadataCache) any() *Broker {
+func (mc *metadataCache) any() *k.Broker {
 	mc.lock.RLock()
 	defer mc.lock.RUnlock()
 
@@ -94,7 +96,7 @@ func (mc *metadataCache) partitions(topic string) []int32 {
 	return ret
 }
 
-func (mc *metadataCache) update(data *MetadataResponse) error {
+func (mc *metadataCache) update(data *k.MetadataResponse) error {
 	// connect to the brokers before taking the lock, as this can take a while
 	// to timeout if one of them isn't reachable
 	for _, broker := range data.Brokers {
@@ -115,12 +117,12 @@ func (mc *metadataCache) update(data *MetadataResponse) error {
 	}
 
 	for _, topic := range data.Topics {
-		if topic.Err != NO_ERROR {
+		if topic.Err != k.NO_ERROR {
 			return topic.Err
 		}
 		mc.leaders[*topic.Name] = make(map[int32]int32, len(topic.Partitions))
 		for _, partition := range topic.Partitions {
-			if partition.Err != NO_ERROR {
+			if partition.Err != k.NO_ERROR {
 				return partition.Err
 			}
 			mc.leaders[*topic.Name][partition.Id] = partition.Leader
@@ -132,13 +134,13 @@ func (mc *metadataCache) update(data *MetadataResponse) error {
 
 func (mc *metadataCache) refreshTopics(topics []*string) error {
 	for broker := mc.any(); broker != nil; broker = mc.any() {
-		response, err := broker.GetMetadata(mc.client.id, &MetadataRequest{topics})
+		response, err := broker.GetMetadata(mc.client.id, &k.MetadataRequest{Topics: topics})
 
 		switch err.(type) {
 		case nil:
 			// valid response, use it
 			return mc.update(response)
-		case EncodingError:
+		case k.EncodingError:
 			// didn't even send, return the error
 			return err
 		}
@@ -148,7 +150,7 @@ func (mc *metadataCache) refreshTopics(topics []*string) error {
 
 	}
 
-	return OutOfBrokers
+	return k.OutOfBrokers
 }
 
 func (mc *metadataCache) refreshTopic(topic string) error {

+ 0 - 0
partitioner.go → kafka/partitioner.go


+ 7 - 5
producer.go → kafka/producer.go

@@ -1,5 +1,7 @@
 package kafka
 
+import k "sarama/protocol"
+
 type Producer struct {
 	client            *Client
 	topic             string
@@ -13,7 +15,7 @@ func NewProducer(client *Client, topic string, partitioner Partitioner, response
 }
 
 func NewSimpleProducer(client *Client, topic string) *Producer {
-	return NewProducer(client, topic, RandomPartitioner{}, WAIT_FOR_LOCAL, 0)
+	return NewProducer(client, topic, RandomPartitioner{}, k.WAIT_FOR_LOCAL, 0)
 }
 
 func (p *Producer) choosePartition(key Encoder) (int32, error) {
@@ -32,7 +34,7 @@ func (p *Producer) choosePartition(key Encoder) (int32, error) {
 	return partitions[partitioner.Partition(key, len(partitions))], nil
 }
 
-func (p *Producer) SendMessage(key, value Encoder) (*ProduceResponse, error) {
+func (p *Producer) SendMessage(key, value Encoder) (*k.ProduceResponse, error) {
 	partition, err := p.choosePartition(key)
 	if err != nil {
 		return nil, err
@@ -57,8 +59,8 @@ func (p *Producer) SendMessage(key, value Encoder) (*ProduceResponse, error) {
 		return nil, err
 	}
 
-	request := &ProduceRequest{ResponseCondition: p.responseCondition, Timeout: p.responseTimeout}
-	request.AddMessage(&p.topic, partition, &Message{Key: keyBytes, Value: valBytes})
+	request := &k.ProduceRequest{ResponseCondition: p.responseCondition, Timeout: p.responseTimeout}
+	request.AddMessage(&p.topic, partition, &k.Message{Key: keyBytes, Value: valBytes})
 
 	response, err := broker.Produce(p.client.id, request)
 	if err != nil {
@@ -68,6 +70,6 @@ func (p *Producer) SendMessage(key, value Encoder) (*ProduceResponse, error) {
 	return response, nil
 }
 
-func (p *Producer) SendSimpleMessage(in string) (*ProduceResponse, error) {
+func (p *Producer) SendSimpleMessage(in string) (*k.ProduceResponse, error) {
 	return p.SendMessage(nil, encodableString(in))
 }

+ 0 - 0
utils.go → kafka/utils.go


+ 1 - 1
broker.go → protocol/broker.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 import (
 	"io"

+ 1 - 9
encoder_decoder.go → protocol/encoder_decoder.go

@@ -1,12 +1,4 @@
-package kafka
-
-// Public Encoding
-
-// A simple interface for any type that can be encoded as an array of bytes
-// in order to be sent as the key or value of a Kafka message.
-type Encoder interface {
-	Encode() ([]byte, error)
-}
+package protocol
 
 // Kafka Encoding
 

+ 1 - 1
errors.go → protocol/errors.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 import "errors"
 

+ 1 - 1
fetch_request.go → protocol/fetch_request.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type fetchRequestBlock struct {
 	fetchOffset int64

+ 1 - 1
fetch_response.go → protocol/fetch_response.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type FetchResponseBlock struct {
 	Err                 KError

+ 1 - 1
message.go → protocol/message.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 import (
 	"bytes"

+ 1 - 1
message_set.go → protocol/message_set.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type MessageBlock struct {
 	Offset int64

+ 1 - 1
metadata_request.go → protocol/metadata_request.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type MetadataRequest struct {
 	Topics []*string

+ 1 - 1
metadata_response.go → protocol/metadata_response.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type PartitionMetadata struct {
 	Err      KError

+ 1 - 1
offset_commit_request.go → protocol/offset_commit_request.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type offsetCommitRequestBlock struct {
 	offset   int64

+ 1 - 1
offset_commit_response.go → protocol/offset_commit_response.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type OffsetCommitResponse struct {
 	Errors map[*string]map[int32]KError

+ 1 - 1
offset_fetch_request.go → protocol/offset_fetch_request.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type OffsetFetchRequest struct {
 	ConsumerGroup *string

+ 1 - 1
offset_fetch_response.go → protocol/offset_fetch_response.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type OffsetFetchResponseBlock struct {
 	Offset   int64

+ 1 - 1
offset_request.go → protocol/offset_request.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 // Special values accepted by Kafka for the 'time' parameter of OffsetRequest.AddBlock().
 const (

+ 1 - 1
offset_response.go → protocol/offset_response.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type OffsetResponseBlock struct {
 	err     KError

+ 1 - 1
packet_crcs.go → protocol/packet_crcs.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 import (
 	"encoding/binary"

+ 1 - 1
packet_decoder.go → protocol/packet_decoder.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type packetDecoder interface {
 	remaining() int

+ 1 - 1
packet_encoder.go → protocol/packet_encoder.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type packetEncoder interface {
 	// primitives

+ 1 - 1
packet_lengths.go → protocol/packet_lengths.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 import "encoding/binary"
 

+ 1 - 1
prep_encoder.go → protocol/prep_encoder.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 import "math"
 

+ 1 - 1
produce_request.go → protocol/produce_request.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 // Special values accepted by Kafka for the ResponseCondition member of produce requests.
 const (

+ 1 - 1
produce_response.go → protocol/produce_response.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type ProduceResponseBlock struct {
 	Err    KError

+ 4 - 0
protocol/protocol.go

@@ -0,0 +1,4 @@
+/*
+TODO
+*/
+package protocol

+ 1 - 1
real_decoder.go → protocol/real_decoder.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 import (
 	"encoding/binary"

+ 1 - 1
real_encoder.go → protocol/real_encoder.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 import "encoding/binary"
 

+ 1 - 1
request.go → protocol/request.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 type requestEncoder interface {
 	encoder

+ 1 - 1
response_header.go → protocol/response_header.go

@@ -1,4 +1,4 @@
-package kafka
+package protocol
 
 import "math"
 

+ 4 - 6
sarama.go

@@ -1,10 +1,8 @@
 /*
-Package kafka (AKA sarama.go) provides client libraries for the Kafka 0.8 protocol.
+Package sarama provides client libraries for the Kafka 0.8 protocol.
 
-It provides a high-level API to make common tasks easy, as well as a low-level API for precise control
-over message batching etc. The high-level API consists of Client, Producer, Consumer, Encoder.
+You probably want "sarama/kafka" which contains the high-level bindings.
 
-If you need more control, you can connect to Kafka brokers directly using the Broker object,
-then send requests and receive responses using functions on the broker itself.
+Alternatively, "sarama/protocol" contains a lower-level API for more precise control.
 */
-package kafka
+package sarama