Pārlūkot izejas kodu

checkpoint wip broker recovery

Evan Huus 12 gadi atpakaļ
vecāks
revīzija
4707652d5f
4 mainītis faili ar 87 papildinājumiem un 80 dzēšanām
  1. 43 45
      broker.go
  2. 43 23
      brokerManager.go
  3. 1 1
      encoderDecoder.go
  4. 0 11
      metadata.go

+ 43 - 45
broker.go

@@ -2,32 +2,34 @@ package kafka
 
 import (
 	"encoding/binary"
+	"io"
 	"math"
 	"net"
 )
 
 type broker struct {
-	nodeId int32
-	host   *string
-	port   int32
+	id   int32
+	host *string
+	port int32
 
 	correlation_id int32
 
 	conn net.Conn
 	addr net.TCPAddr
 
-	requests  chan reqResPair
-	responses chan reqResPair
+	requests  chan responsePromise
+	responses chan responsePromise
 }
 
-type reqResPair struct {
+type responsePromise struct {
 	correlation_id int32
 	packets        chan []byte
+	errors         chan error
 }
 
 func newBroker(host string, port int32) (b *broker, err error) {
 	b = new(broker)
-	b.nodeId = -1 // don't know it yet
+	b.id = -1 // don't know it yet
 	b.host = &host
 	b.port = port
 	err = b.connect()
@@ -53,30 +55,30 @@ func (b *broker) connect() (err error) {
 	}
 
 	go b.sendRequestLoop()
-	go b.rcvResponseLoop()
+	go b.rcvresponsePromiseLoop()
 
 	return nil
 }
 
-func (b *broker) disconnect() {
-	close(b.requests)
-	b.requests = nil
+func (b *broker) forceDisconnect(reqRes *responsePromise, err error) {
+	reqRes.errors <- err
+	close(reqRes.errors)
+	close(reqRes.packets)
 
+	close(b.requests)
 	close(b.responses)
-	b.responses = nil
 
 	b.conn.Close()
-	b.conn = nil
 }
 
 func (b *broker) encode(pe packetEncoder) {
-	pe.putInt32(b.nodeId)
+	pe.putInt32(b.id)
 	pe.putString(b.host)
 	pe.putInt32(b.port)
 }
 
 func (b *broker) decode(pd packetDecoder) (err error) {
-	b.nodeId, err = pd.getInt32()
+	b.id, err = pd.getInt32()
 	if err != nil {
 		return err
 	}
@@ -100,61 +102,56 @@ func (b *broker) decode(pd packetDecoder) (err error) {
 }
 
 func (b *broker) sendRequestLoop() {
-	var n int
-	var err error
-	var buf []byte
 	for request := range b.requests {
-		buf = <-request.packets
-		n, err = b.conn.Write(buf)
-		if err != nil || n != len(buf) {
-			b.disconnect()
+		buf := <-request.packets
+		_, err := b.conn.Write(buf)
+		if err != nil {
+			b.forceDisconnect(&request, err)
 			return
 		}
 		b.responses <- request
 	}
 }
 
-func (b *broker) rcvResponseLoop() {
-	var n int
-	var length int32
-	var err error
-	var buf []byte
+func (b *broker) rcvresponsePromiseLoop() {
 	header := make([]byte, 4)
 	for response := range b.responses {
-		n, err = b.conn.Read(header)
-		if err != nil || n != 4 {
-			b.disconnect()
+		_, err := io.ReadFull(b.conn, header)
+		if err != nil {
+			b.forceDisconnect(&response, err)
 			return
 		}
-		length = int32(binary.BigEndian.Uint32(header))
+
+		length := int32(binary.BigEndian.Uint32(header))
 		if length <= 4 || length > 2*math.MaxUint16 {
-			b.disconnect()
+			b.forceDisconnect(&response, DecodingError{})
 			return
 		}
 
-		n, err = b.conn.Read(header)
-		if err != nil || n != 4 {
-			b.disconnect()
+		_, err = io.ReadFull(b.conn, header)
+		if err != nil {
+			b.forceDisconnect(&response, err)
 			return
 		}
 		if response.correlation_id != int32(binary.BigEndian.Uint32(header)) {
-			b.disconnect()
+			b.forceDisconnect(&response, DecodingError{})
 			return
 		}
 
-		buf = make([]byte, length-4)
-		n, err = b.conn.Read(buf)
-		if err != nil || n != int(length-4) {
-			b.disconnect()
+		buf := make([]byte, length-4)
+		_, err = io.ReadFull(b.conn, buf)
+		if err != nil {
+			b.forceDisconnect(&response, err)
 			return
 		}
 
 		response.packets <- buf
 		close(response.packets)
+		close(response.errors)
 	}
 }
 
-func (b *broker) sendRequest(clientID *string, body encoder) (chan []byte, error) {
+func (b *broker) sendRequest(clientID *string, body encoder) (*responsePromise, error) {
 	var prepEnc prepEncoder
 	var realEnc realEncoder
 	var api API
@@ -177,12 +174,13 @@ func (b *broker) sendRequest(clientID *string, body encoder) (chan []byte, error
 	realEnc.putInt32(int32(prepEnc.length))
 	req.encode(&realEnc)
 
-	// we buffer one packet so that we can send our packet to the request queue without
-	// blocking, and so that the responses can be sent to us async if we want them
-	request := reqResPair{b.correlation_id, make(chan []byte, 1)}
+	// we buffer one packet and one error so that all this can work async if the
+	// caller so desires. we also cheat and use the same responsePromise object for both the
+	// request and the response, as things are much simpler that way
+	request := responsePromise{b.correlation_id, make(chan []byte, 1), make(chan error, 1)}
 
 	request.packets <- realEnc.raw
 	b.requests <- request
 	b.correlation_id++
-	return request.packets, nil
+	return &request, nil
 }

+ 43 - 23
brokerManager.go

@@ -2,7 +2,7 @@ package kafka
 
 import "sync"
 
-type brokerKey struct {
+type topicPartition struct {
 	topic     string
 	partition int32
 }
@@ -10,8 +10,10 @@ type brokerKey struct {
 type brokerManager struct {
 	client        *Client
 	defaultBroker *broker
-	leaders       map[brokerKey]*broker
-	leadersLock   sync.RWMutex
+
+	brokers     map[int32]*broker
+	leaders     map[topicPartition]int32
+	brokersLock sync.RWMutex
 }
 
 func newBrokerManager(client *Client, host string, port int32) (bm *brokerManager, err error) {
@@ -26,7 +28,8 @@ func newBrokerManager(client *Client, host string, port int32) (bm *brokerManage
 		return nil, err
 	}
 
-	bm.leaders = make(map[brokerKey]*broker)
+	bm.brokers = make(map[int32]*broker)
+	bm.leaders = make(map[topicPartition]int32)
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	err = bm.refreshTopics(make([]*string, 0))
@@ -38,17 +41,18 @@ func newBrokerManager(client *Client, host string, port int32) (bm *brokerManage
 }
 
 func (bm *brokerManager) lookupLeader(topic string, partition int32) *broker {
-	bm.leadersLock.RLock()
-	defer bm.leadersLock.RUnlock()
-	return bm.leaders[brokerKey{topic, partition}]
+	bm.brokersLock.RLock()
+	defer bm.brokersLock.RUnlock()
+	return bm.brokers[bm.leaders[topicPartition{topic, partition}]]
 }
 
 func (bm *brokerManager) getDefault() *broker {
 
 	if bm.defaultBroker == nil {
-		bm.leadersLock.RLock()
-		defer bm.leadersLock.RUnlock()
-		for _, bm.defaultBroker = range bm.leaders {
+		bm.brokersLock.RLock()
+		defer bm.brokersLock.RUnlock()
+		for _, id := range bm.leaders {
+			bm.defaultBroker = bm.brokers[id]
 			break
 		}
 	}
@@ -56,26 +60,42 @@ func (bm *brokerManager) getDefault() *broker {
 	return bm.defaultBroker
 }
 
-func (bm *brokerManager) refreshTopics(topics []*string) error {
-	b := bm.getDefault()
-	if b == nil {
-		return OutOfBrokers{}
-	}
+func (bm *brokerManager) tryDefaultBrokers(req encoder, res decoder) error {
+	for b := bm.getDefault(); b != nil; b = bm.getDefault() {
+		responseChan, err := b.sendRequest(bm.client.id, req)
+		if err != nil {
+			return err
+		}
 
-	responseChan, err := b.sendRequest(bm.client.id, &metadataRequest{topics})
-	if err != nil {
-		return err
+		select {
+		case buf := <-responseChan.packets:
+			decoder := realDecoder{raw: buf}
+			err = res.decode(&decoder)
+			return err
+		case <-responseChan.errors:
+			bm.defaultBroker = nil
+			bm.brokersLock.Lock()
+			delete(bm.brokers, b.id)
+			bm.brokersLock.Unlock()
+		}
 	}
+	return OutOfBrokers{}
+}
 
-	decoder := realDecoder{raw: <-responseChan}
+func (bm *brokerManager) refreshTopics(topics []*string) error {
 	response := new(metadata)
-	err = response.decode(&decoder)
+	err := bm.tryDefaultBrokers(&metadataRequest{topics}, response)
 	if err != nil {
 		return err
 	}
 
-	bm.leadersLock.Lock()
-	defer bm.leadersLock.Unlock()
+	bm.brokersLock.Lock()
+	defer bm.brokersLock.Unlock()
+
+	for i := range response.brokers {
+		broker := &response.brokers[i]
+		bm.brokers[broker.id] = broker
+	}
 
 	for i := range response.topics {
 		topic := &response.topics[i]
@@ -87,7 +107,7 @@ func (bm *brokerManager) refreshTopics(topics []*string) error {
 			if partition.err != NO_ERROR {
 				return partition.err
 			}
-			bm.leaders[brokerKey{*topic.name, partition.id}] = response.brokerById(partition.leader)
+			bm.leaders[topicPartition{*topic.name, partition.id}] = partition.leader
 		}
 	}
 

+ 1 - 1
encoderDecoder.go

@@ -5,7 +5,7 @@ type encoder interface {
 }
 
 type decoder interface {
-	decoder(pd packetDecoder)
+	decode(pd packetDecoder) error
 }
 
 type encoderDecoder interface {

+ 0 - 11
metadata.go

@@ -3,8 +3,6 @@ package kafka
 type metadata struct {
 	brokers []broker
 	topics  []topicMetadata
-
-	brokerMap map[int32]*broker
 }
 
 func (m *metadata) encode(pe packetEncoder) {
@@ -25,13 +23,11 @@ func (m *metadata) decode(pd packetDecoder) (err error) {
 	}
 
 	m.brokers = make([]broker, n)
-	m.brokerMap = make(map[int32]*broker, n)
 	for i := 0; i < n; i++ {
 		err = (&m.brokers[i]).decode(pd)
 		if err != nil {
 			return err
 		}
-		m.brokerMap[m.brokers[i].nodeId] = &m.brokers[i]
 	}
 
 	n, err = pd.getArrayCount()
@@ -49,10 +45,3 @@ func (m *metadata) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
-
-func (m *metadata) brokerById(id int32) *broker {
-	if m.brokerMap == nil {
-		return nil
-	}
-	return m.brokerMap[id]
-}