Browse Source

Refactor. Refresh topic metadata in more cases.

Specifically, if we get a valid, parsable response that contains an error like
NotLeaderForPartition.
Evan Huus 12 years ago
parent
commit
39be3b4fee
9 changed files with 100 additions and 40 deletions
  1. 0 11
      api.go
  2. 9 6
      broker.go
  3. 27 21
      broker_manager.go
  4. 4 0
      metadata_request.go
  5. 10 0
      produce_request.go
  6. 20 0
      produce_response.go
  7. 8 1
      producer.go
  8. 12 1
      request.go
  9. 10 0
      response.go

+ 0 - 11
api.go

@@ -1,11 +0,0 @@
-package kafka
-
-type api interface {
-	key() int16
-	version() int16
-}
-
-type apiEncoder interface {
-	encoder
-	api
-}

+ 9 - 6
broker.go

@@ -164,7 +164,7 @@ func (b *broker) rcvResponseLoop() {
 	}
 	}
 }
 }
 
 
-func (b *broker) sendRequest(clientID *string, body apiEncoder, expectResponse bool) (*responsePromise, error) {
+func (b *broker) sendRequest(clientID *string, body requestEncoder) (*responsePromise, error) {
 	var prepEnc prepEncoder
 	var prepEnc prepEncoder
 	var realEnc realEncoder
 	var realEnc realEncoder
 
 
@@ -179,7 +179,7 @@ func (b *broker) sendRequest(clientID *string, body apiEncoder, expectResponse b
 	realEnc.putInt32(int32(prepEnc.length))
 	realEnc.putInt32(int32(prepEnc.length))
 	req.encode(&realEnc)
 	req.encode(&realEnc)
 
 
-	request := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, expectResponse}
+	request := requestToSend{responsePromise{b.correlation_id, make(chan []byte), make(chan error)}, body.expectResponse()}
 
 
 	b.requests <- request
 	b.requests <- request
 	request.response.packets <- realEnc.raw // we cheat to avoid poofing up more channels than necessary
 	request.response.packets <- realEnc.raw // we cheat to avoid poofing up more channels than necessary
@@ -187,10 +187,12 @@ func (b *broker) sendRequest(clientID *string, body apiEncoder, expectResponse b
 	return &request.response, nil
 	return &request.response, nil
 }
 }
 
 
-func (b *broker) sendAndReceive(clientID *string, req apiEncoder, res decoder) error {
-	responseChan, err := b.sendRequest(clientID, req, res != nil)
+// returns true if there was a response, even if there was an error decoding it (in
+// which case it will also return an error of some sort)
+func (b *broker) sendAndReceive(clientID *string, req requestEncoder, res decoder) (bool, error) {
+	responseChan, err := b.sendRequest(clientID, req)
 	if err != nil {
 	if err != nil {
-		return err
+		return false, err
 	}
 	}
 
 
 	select {
 	select {
@@ -199,9 +201,10 @@ func (b *broker) sendAndReceive(clientID *string, req apiEncoder, res decoder) e
 		if buf != nil {
 		if buf != nil {
 			decoder := realDecoder{raw: buf}
 			decoder := realDecoder{raw: buf}
 			err = res.decode(&decoder)
 			err = res.decode(&decoder)
+			return true, err
 		}
 		}
 	case err = <-responseChan.errors:
 	case err = <-responseChan.errors:
 	}
 	}
 
 
-	return err
+	return false, err
 }
 }

+ 27 - 21
broker_manager.go

@@ -104,22 +104,34 @@ func (bm *brokerManager) choosePartition(topic string, p partitionChooser) (int3
 	return p.choosePartition(partitions), nil
 	return p.choosePartition(partitions), nil
 }
 }
 
 
-func (bm *brokerManager) sendToPartition(topic string, partition int32, req apiEncoder, res decoder) error {
+func (bm *brokerManager) sendToPartition(topic string, partition int32, req requestEncoder, res responseDecoder) (bool, error) {
 	b, err := bm.getValidLeader(topic, partition)
 	b, err := bm.getValidLeader(topic, partition)
 	if err != nil {
 	if err != nil {
-		return err
+		return false, err
 	}
 	}
 
 
-	err = b.sendAndReceive(bm.client.id, req, res)
+	gotResponse, err := b.sendAndReceive(bm.client.id, req, res)
 	switch err.(type) {
 	switch err.(type) {
-	case nil:
-		// successfully received and decoded the packet, we're done
-		// (the actual decoded data is stored in the res parameter)
-		return nil
 	case EncodingError:
 	case EncodingError:
 		// encoding errors are our problem, not the broker's, so just return them
 		// encoding errors are our problem, not the broker's, so just return them
 		// rather than refreshing the broker metadata
 		// rather than refreshing the broker metadata
-		return err
+		return false, err
+	case nil:
+		// no error, did we get a response?
+		if gotResponse {
+			// yes, so check for stale topics that may require a resend
+			stale := res.staleTopics()
+			if len(stale) == 0 {
+				return true, nil
+			}
+			err = bm.refreshTopics(stale)
+			if err != nil {
+				return true, err
+			}
+		} else {
+			// no, so we have to assume it worked
+			return false, nil
+		}
 	default:
 	default:
 		// broker error, so discard that broker
 		// broker error, so discard that broker
 		bm.terminateBroker(b.id)
 		bm.terminateBroker(b.id)
@@ -131,7 +143,7 @@ func (bm *brokerManager) sendToPartition(topic string, partition int32, req apiE
 	// we pass that error back to the user (as opposed to retrying indefinitely)
 	// we pass that error back to the user (as opposed to retrying indefinitely)
 	b, err = bm.getValidLeader(topic, partition)
 	b, err = bm.getValidLeader(topic, partition)
 	if err != nil {
 	if err != nil {
-		return err
+		return false, err
 	}
 	}
 
 
 	return b.sendAndReceive(bm.client.id, req, res)
 	return b.sendAndReceive(bm.client.id, req, res)
@@ -151,30 +163,24 @@ func (bm *brokerManager) getDefault() *broker {
 	return bm.defaultBroker
 	return bm.defaultBroker
 }
 }
 
 
-func (bm *brokerManager) sendToAny(req apiEncoder, res decoder) error {
+func (bm *brokerManager) sendToAny(req requestEncoder, res decoder) (bool, error) {
 	for b := bm.getDefault(); b != nil; b = bm.getDefault() {
 	for b := bm.getDefault(); b != nil; b = bm.getDefault() {
-		err := b.sendAndReceive(bm.client.id, req, res)
+		gotResponse, err := b.sendAndReceive(bm.client.id, req, res)
 		switch err.(type) {
 		switch err.(type) {
-		case nil:
-			// successfully received and decoded the packet, we're done
-			// (the actual decoded data is stored in the res parameter)
-			return nil
-		case EncodingError:
-			// encoding errors are our problem, not the broker's, so just return them
-			// rather than trying another broker
-			return err
+		case nil, EncodingError:
+			return gotResponse, err
 		default:
 		default:
 			// broker error, so discard that broker
 			// broker error, so discard that broker
 			bm.defaultBroker = nil
 			bm.defaultBroker = nil
 			bm.terminateBroker(b.id)
 			bm.terminateBroker(b.id)
 		}
 		}
 	}
 	}
-	return OutOfBrokers{}
+	return false, OutOfBrokers{}
 }
 }
 
 
 func (bm *brokerManager) refreshTopics(topics []*string) error {
 func (bm *brokerManager) refreshTopics(topics []*string) error {
 	response := new(metadata)
 	response := new(metadata)
-	err := bm.sendToAny(&metadataRequest{topics}, response)
+	_, err := bm.sendToAny(&metadataRequest{topics}, response)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}

+ 4 - 0
metadata_request.go

@@ -35,3 +35,7 @@ func (mr *metadataRequest) key() int16 {
 func (mr *metadataRequest) version() int16 {
 func (mr *metadataRequest) version() int16 {
 	return 0
 	return 0
 }
 }
+
+func (mr *metadataRequest) expectResponse() bool {
+	return true
+}

+ 10 - 0
produce_request.go

@@ -25,6 +25,12 @@ func (p *produceRequestTopicBlock) encode(pe packetEncoder) {
 	}
 	}
 }
 }
 
 
+const (
+	NO_RESPONSE    int16 = 0
+	WAIT_FOR_LOCAL int16 = 1
+	WAIT_FOR_ALL   int16 = -1
+)
+
 type produceRequest struct {
 type produceRequest struct {
 	requiredAcks int16
 	requiredAcks int16
 	timeout      int32
 	timeout      int32
@@ -48,6 +54,10 @@ func (p *produceRequest) version() int16 {
 	return 0
 	return 0
 }
 }
 
 
+func (p *produceRequest) expectResponse() bool {
+	return p.requiredAcks != NO_RESPONSE
+}
+
 func newSingletonProduceRequest(topic string, partition int32, set *messageSet) *produceRequest {
 func newSingletonProduceRequest(topic string, partition int32, set *messageSet) *produceRequest {
 	req := &produceRequest{topics: make([]produceRequestTopicBlock, 1)}
 	req := &produceRequest{topics: make([]produceRequestTopicBlock, 1)}
 	req.topics[0].topic = &topic
 	req.topics[0].topic = &topic

+ 20 - 0
produce_response.go

@@ -72,3 +72,23 @@ func (pr *produceResponse) decode(pd packetDecoder) (err error) {
 
 
 	return nil
 	return nil
 }
 }
+
+func (pr *produceResponse) staleTopics() []*string {
+	ret := make([]*string, 0)
+
+	for i := range pr.topics {
+		topic := &pr.topics[i]
+
+	currentTopic:
+		for j := range topic.partitions {
+			partition := &topic.partitions[j]
+			switch partition.err {
+			case UNKNOWN, UNKNOWN_TOPIC_OR_PARTITION, LEADER_NOT_AVAILABLE, NOT_LEADER_FOR_PARTITION:
+				ret = append(ret, topic.name)
+				break currentTopic
+			}
+		}
+	}
+
+	return ret
+}

+ 8 - 1
producer.go

@@ -16,6 +16,13 @@ func (p *Producer) SendSimpleMessage(in string) error {
 	}
 	}
 
 
 	request := newSingletonProduceRequest(p.topic, partition, newSingletonMessageSet(newMessageFromString(in)))
 	request := newSingletonProduceRequest(p.topic, partition, newSingletonMessageSet(newMessageFromString(in)))
+	request.requiredAcks = WAIT_FOR_LOCAL
 
 
-	return p.client.brokers.sendToPartition(p.topic, partition, request, nil)
+	response := produceResponse{}
+
+	_, err = p.client.brokers.sendToPartition(p.topic, partition, request, &response)
+	if err != nil {
+		return err
+	}
+	return nil
 }
 }

+ 12 - 1
request.go

@@ -1,9 +1,20 @@
 package kafka
 package kafka
 
 
+type requestAPI interface {
+	key() int16
+	version() int16
+	expectResponse() bool
+}
+
+type requestEncoder interface {
+	encoder
+	requestAPI
+}
+
 type request struct {
 type request struct {
 	correlation_id int32
 	correlation_id int32
 	id             *string
 	id             *string
-	body           apiEncoder
+	body           requestEncoder
 }
 }
 
 
 func (r *request) encode(pe packetEncoder) {
 func (r *request) encode(pe packetEncoder) {

+ 10 - 0
response.go

@@ -0,0 +1,10 @@
+package kafka
+
+type responseAPI interface {
+	staleTopics() []*string
+}
+
+type responseDecoder interface {
+	decoder
+	responseAPI
+}