Browse Source

checkpoint wip broker recovery

Evan Huus 12 years ago
parent
commit
4b9ba1b53c
2 changed files with 36 additions and 16 deletions
  1. 23 3
      brokerManager.go
  2. 13 13
      errors.go

+ 23 - 3
brokerManager.go

@@ -40,10 +40,30 @@ func newBrokerManager(client *Client, host string, port int32) (bm *brokerManage
 	return bm, nil
 }
 
-func (bm *brokerManager) lookupLeader(topic string, partition int32) *broker {
+func (bm *brokerManager) lookupLeader(topic string, partition int32) (*broker, error) {
+	var broker *broker = nil
 	bm.brokersLock.RLock()
-	defer bm.brokersLock.RUnlock()
-	return bm.brokers[bm.leaders[topicPartition{topic, partition}]]
+	id, ok := bm.leaders[topicPartition{topic, partition}]
+	if ok {
+		broker = bm.brokers[id]
+	}
+	bm.brokersLock.RUnlock()
+
+	if broker == nil {
+		err := bm.refreshTopic(topic)
+		if err != nil {
+			return nil, err
+		}
+		bm.brokersLock.RLock()
+		broker = bm.brokers[bm.leaders[topicPartition{topic, partition}]]
+		bm.brokersLock.RUnlock()
+	}
+
+	if broker == nil {
+		return nil, UNKNOWN_TOPIC_OR_PARTITION
+	}
+
+	return broker, nil
 }
 
 func (bm *brokerManager) getDefault() *broker {

+ 13 - 13
errors.go

@@ -4,19 +4,19 @@ type KError int16
 
 const (
 	NO_ERROR                    KError = 0
-	UNKNOWN                            = -1
-	OFFSET_OUT_OF_RANGE                = 1
-	INVALID_MESSAGE                    = 2
-	UNKNOWN_TOPIC_OR_PARTITION         = 3
-	INVALID_MESSAGE_SIZE               = 4
-	LEADER_NOT_AVAILABLE               = 5
-	NOT_LEADER_FOR_PARTITION           = 6
-	REQUEST_TIMED_OUT                  = 7
-	BROKER_NOT_AVAILABLE               = 8
-	REPLICA_NOT_AVAILABLE              = 9
-	MESSAGE_SIZE_TOO_LARGE             = 10
-	STALE_CONTROLLER_EPOCH_CODE        = 11
-	OFFSET_METADATA_TOO_LARGE          = 12
+	UNKNOWN                     KError = -1
+	OFFSET_OUT_OF_RANGE         KError = 1
+	INVALID_MESSAGE             KError = 2
+	UNKNOWN_TOPIC_OR_PARTITION  KError = 3
+	INVALID_MESSAGE_SIZE        KError = 4
+	LEADER_NOT_AVAILABLE        KError = 5
+	NOT_LEADER_FOR_PARTITION    KError = 6
+	REQUEST_TIMED_OUT           KError = 7
+	BROKER_NOT_AVAILABLE        KError = 8
+	REPLICA_NOT_AVAILABLE       KError = 9
+	MESSAGE_SIZE_TOO_LARGE      KError = 10
+	STALE_CONTROLLER_EPOCH_CODE KError = 11
+	OFFSET_METADATA_TOO_LARGE   KError = 12
 )
 
 func (err KError) Error() string {