浏览代码

More API slimming, publish ProduceResponse

Evan Huus 12 年之前
父节点
当前提交
eeb9b0926c
共有 4 个文件被更改,包括 33 次插入59 次删除
  1. 1 1
      broker_manager.go
  2. 22 42
      produce_response.go
  3. 10 6
      producer.go
  4. 0 10
      response.go

+ 1 - 1
broker_manager.go

@@ -104,7 +104,7 @@ func (bm *brokerManager) partitionsForTopic(topic string) ([]int32, error) {
 	return partitions, nil
 }
 
-func (bm *brokerManager) sendToPartition(topic string, partition int32, req requestEncoder, res responseDecoder) error {
+func (bm *brokerManager) sendToPartition(topic string, partition int32, req requestEncoder, res decoder) error {
 	b, err := bm.getLeader(topic, partition)
 	if err != nil {
 		return err

+ 22 - 42
produce_response.go

@@ -1,23 +1,23 @@
 package kafka
 
-type produceResponsePartitionBlock struct {
-	id     int32
-	err    KError
-	offset int64
+type ProduceResponsePartitionBlock struct {
+	Id     int32
+	Err    KError
+	Offset int64
 }
 
-func (pr *produceResponsePartitionBlock) decode(pd packetDecoder) (err error) {
-	pr.id, err = pd.getInt32()
+func (pr *ProduceResponsePartitionBlock) decode(pd packetDecoder) (err error) {
+	pr.Id, err = pd.getInt32()
 	if err != nil {
 		return err
 	}
 
-	pr.err, err = pd.getError()
+	pr.Err, err = pd.getError()
 	if err != nil {
 		return err
 	}
 
-	pr.offset, err = pd.getInt64()
+	pr.Offset, err = pd.getInt64()
 	if err != nil {
 		return err
 	}
@@ -25,13 +25,13 @@ func (pr *produceResponsePartitionBlock) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
-type produceResponseTopicBlock struct {
-	name       *string
-	partitions []produceResponsePartitionBlock
+type ProduceResponseTopicBlock struct {
+	Name       *string
+	Partitions []ProduceResponsePartitionBlock
 }
 
-func (pr *produceResponseTopicBlock) decode(pd packetDecoder) (err error) {
-	pr.name, err = pd.getString()
+func (pr *ProduceResponseTopicBlock) decode(pd packetDecoder) (err error) {
+	pr.Name, err = pd.getString()
 	if err != nil {
 		return err
 	}
@@ -41,9 +41,9 @@ func (pr *produceResponseTopicBlock) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	pr.partitions = make([]produceResponsePartitionBlock, n)
-	for i := range pr.partitions {
-		err = (&pr.partitions[i]).decode(pd)
+	pr.Partitions = make([]ProduceResponsePartitionBlock, n)
+	for i := range pr.Partitions {
+		err = (&pr.Partitions[i]).decode(pd)
 		if err != nil {
 			return err
 		}
@@ -52,19 +52,19 @@ func (pr *produceResponseTopicBlock) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
-type produceResponse struct {
-	topics []produceResponseTopicBlock
+type ProduceResponse struct {
+	Topics []ProduceResponseTopicBlock
 }
 
-func (pr *produceResponse) decode(pd packetDecoder) (err error) {
+func (pr *ProduceResponse) decode(pd packetDecoder) (err error) {
 	n, err := pd.getArrayCount()
 	if err != nil {
 		return err
 	}
 
-	pr.topics = make([]produceResponseTopicBlock, n)
-	for i := range pr.topics {
-		err = (&pr.topics[i]).decode(pd)
+	pr.Topics = make([]ProduceResponseTopicBlock, n)
+	for i := range pr.Topics {
+		err = (&pr.Topics[i]).decode(pd)
 		if err != nil {
 			return err
 		}
@@ -72,23 +72,3 @@ func (pr *produceResponse) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
-
-func (pr *produceResponse) staleTopics() []*string {
-	ret := make([]*string, 0)
-
-	for i := range pr.topics {
-		topic := &pr.topics[i]
-
-	currentTopic:
-		for j := range topic.partitions {
-			partition := &topic.partitions[j]
-			switch partition.err {
-			case UNKNOWN, UNKNOWN_TOPIC_OR_PARTITION, LEADER_NOT_AVAILABLE, NOT_LEADER_FOR_PARTITION:
-				ret = append(ret, topic.name)
-				break currentTopic
-			}
-		}
-	}
-
-	return ret
-}

+ 10 - 6
producer.go

@@ -16,10 +16,10 @@ func NewSimpleProducer(client *Client, topic string) *Producer {
 	return NewProducer(client, topic, RandomPartitioner{}, WAIT_FOR_LOCAL, 0)
 }
 
-func (p *Producer) SendMessage(key, value encoder) error {
+func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
 	partitions, err := p.client.brokers.partitionsForTopic(p.topic)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	var partitioner PartitionChooser
@@ -32,16 +32,20 @@ func (p *Producer) SendMessage(key, value encoder) error {
 
 	msg, err := newMessage(key, value)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	request := newSingletonProduceRequest(p.topic, partition, newSingletonMessageSet(msg))
 	request.requiredAcks = p.responseCondition
 	request.timeout = p.responseTimeout
 
-	err = p.client.brokers.sendToPartition(p.topic, partition, request, &produceResponse{})
+	var response *ProduceResponse
+	if request.expectResponse() {
+		response = new(ProduceResponse)
+	}
+	err = p.client.brokers.sendToPartition(p.topic, partition, request, response)
 
-	return err
+	return response, err
 }
 
 type encodableString string
@@ -50,6 +54,6 @@ func (s encodableString) encode(pe packetEncoder) {
 	pe.putRaw([]byte(s))
 }
 
-func (p *Producer) SendSimpleMessage(in string) error {
+func (p *Producer) SendSimpleMessage(in string) (*ProduceResponse, error) {
 	return p.SendMessage(nil, encodableString(in))
 }

+ 0 - 10
response.go

@@ -1,10 +0,0 @@
-package kafka
-
-type responseAPI interface {
-	staleTopics() []*string
-}
-
-type responseDecoder interface {
-	decoder
-	responseAPI
-}