Selaa lähdekoodia

Merge pull request #10 from Shopify/handle_unreachable_brokers

Rework how the client connects to brokers.
Evan Huus 12 vuotta sitten
vanhempi
commit
7626e3d97c
4 muutettua tiedostoa jossa 64 lisäystä ja 42 poistoa
  1. 40 16
      broker.go
  2. 6 4
      broker_test.go
  3. 17 21
      client.go
  4. 1 1
      errors.go

+ 40 - 16
broker.go

@@ -14,6 +14,7 @@ type Broker struct {
 
 	correlation_id int32
 	conn           net.Conn
+	conn_err       error
 	lock           sync.Mutex
 
 	responses chan responsePromise
@@ -27,7 +28,7 @@ type responsePromise struct {
 }
 
 // NewBroker creates and returns a Broker targetting the given host:port address.
-// This does not attempt to actually connect, you have to call Connect() for that.
+// This does not attempt to actually connect, you have to call Open() for that.
 func NewBroker(host string, port int32) *Broker {
 	b := new(Broker)
 	b.id = -1 // don't know it yet
@@ -36,34 +37,52 @@ func NewBroker(host string, port int32) *Broker {
 	return b
 }
 
-func (b *Broker) Connect() error {
+// Open tries to connect to the Broker. It takes the broker lock synchronously, then spawns a goroutine which
+// connects and releases the lock. This means any subsequent operations on the broker will block waiting for
+// the connection to finish. To get the effect of a fully synchronous Open call, follow it by a call to Connected().
+// The only error Open will return directly is AlreadyConnected.
+func (b *Broker) Open() error {
 	b.lock.Lock()
-	defer b.lock.Unlock()
 
 	if b.conn != nil {
+		b.lock.Unlock()
 		return AlreadyConnected
 	}
 
-	addr, err := net.ResolveIPAddr("ip", b.host)
-	if err != nil {
-		return err
-	}
+	go func() {
+		defer b.lock.Unlock()
 
-	b.conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
-	if err != nil {
-		return err
-	}
+		var addr *net.IPAddr
+		addr, b.conn_err = net.ResolveIPAddr("ip", b.host)
+		if b.conn_err != nil {
+			return
+		}
 
-	b.done = make(chan bool)
+		b.conn, b.conn_err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
+		if b.conn_err != nil {
+			return
+		}
+
+		b.done = make(chan bool)
 
-	// permit a few outstanding requests before we block waiting for responses
-	b.responses = make(chan responsePromise, 4)
+		// permit a few outstanding requests before we block waiting for responses
+		b.responses = make(chan responsePromise, 4)
 
-	go b.responseReceiver()
+		go b.responseReceiver()
+	}()
 
 	return nil
 }
 
+// Connected returns true if the broker is connected and false otherwise. If the broker is not
+// connected but it had tried to connect, the error from that connection attempt is also returned.
+func (b *Broker) Connected() (bool, error) {
+	b.lock.Lock()
+	defer b.lock.Unlock()
+
+	return b.conn != nil, b.conn_err
+}
+
 func (b *Broker) Close() error {
 	b.lock.Lock()
 	defer b.lock.Unlock()
@@ -78,6 +97,7 @@ func (b *Broker) Close() error {
 	err := b.conn.Close()
 
 	b.conn = nil
+	b.conn_err = nil
 	b.done = nil
 	b.responses = nil
 
@@ -184,7 +204,11 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
 	defer b.lock.Unlock()
 
 	if b.conn == nil {
-		return nil, NotConnected
+		if b.conn_err != nil {
+			return nil, b.conn_err
+		} else {
+			return nil, NotConnected
+		}
 	}
 
 	fullRequest := request{b.correlation_id, clientID, req}

+ 6 - 4
broker_test.go

@@ -141,18 +141,20 @@ func NewMockBroker(t *testing.T, responses chan []byte) *MockBroker {
 
 func ExampleBroker() error {
 	broker := NewBroker("localhost", 9092)
-	err := broker.Connect()
+	err := broker.Open()
 	if err != nil {
 		return err
 	}
+	defer broker.Close()
 
 	request := MetadataRequest{Topics: []string{"myTopic"}}
 	response, err := broker.GetMetadata("myClient", &request)
+	if err != nil {
+		return err
+	}
 
 	fmt.Println("There are", len(response.Topics), "topics active in the cluster.")
 
-	broker.Close()
-
 	return nil
 }
 
@@ -217,7 +219,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
 	defer mockBroker.Close()
 
 	broker := NewBroker("localhost", mockBroker.Port())
-	err := broker.Connect()
+	err := broker.Open()
 	if err != nil {
 		t.Fatal(err)
 	}

+ 17 - 21
client.go

@@ -22,7 +22,11 @@ type Client struct {
 // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
 func NewClient(id string, host string, port int32) (client *Client, err error) {
 	tmp := NewBroker(host, port)
-	err = tmp.Connect()
+	err = tmp.Open()
+	if err != nil {
+		return nil, err
+	}
+	_, err = tmp.Connected()
 	if err != nil {
 		return nil, err
 	}
@@ -177,7 +181,7 @@ func (client *Client) cachedLeader(topic string, partition_id int32) *Broker {
 	partitions := client.leaders[topic]
 	if partitions != nil {
 		leader, ok := partitions[partition_id]
-		if ok && leader != -1 {
+		if ok {
 			return client.brokers[leader]
 		}
 	}
@@ -205,34 +209,29 @@ func (client *Client) cachedPartitions(topic string) []int32 {
 
 // if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE
 func (client *Client) update(data *MetadataResponse) ([]string, error) {
+	client.lock.Lock()
+	defer client.lock.Unlock()
+
 	// First discard brokers that we already know about. This avoids bouncing TCP connections,
 	// and especially avoids closing valid connections out from under other code which may be trying
-	// to use them. We only need a read-lock for this.
+	// to use them.
 	var newBrokers []*Broker
-	client.lock.RLock()
 	for _, broker := range data.Brokers {
 		if !broker.Equals(client.brokers[broker.ID()]) {
 			newBrokers = append(newBrokers, broker)
 		}
 	}
-	client.lock.RUnlock()
-
-	// connect to the brokers before taking the write lock, as this can take a while
-	// to timeout if one of them isn't reachable
-	for _, broker := range newBrokers {
-		err := broker.Connect()
-		if err != nil {
-			return nil, err
-		}
-	}
-
-	client.lock.Lock()
-	defer client.lock.Unlock()
 
+	// Now asynchronously try to open connections to the new brokers. We don't care if they
+	// fail, since maybe that broker is unreachable but doesn't have a topic we care about.
+	// If it fails and we do care, whoever tries to use it will get the connection error.
+	// If we have an old broker with that ID (but a different host/port, since they didn't
+	// compare as equals above) then close and remove that broker before saving the new one.
 	for _, broker := range newBrokers {
 		if client.brokers[broker.ID()] != nil {
 			go client.brokers[broker.ID()].Close()
 		}
+		broker.Open()
 		client.brokers[broker.ID()] = broker
 	}
 
@@ -251,11 +250,8 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 		for _, partition := range topic.Partitions {
 			switch partition.Err {
 			case LEADER_NOT_AVAILABLE:
-				// in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the
-				// partition is in the middle of leader election, so we fallthrough to save it
-				// anyways in order to avoid returning the stale leader (since -1 isn't a valid broker ID)
 				toRetry[topic.Name] = true
-				fallthrough
+				delete(client.leaders[topic.Name], partition.Id)
 			case NO_ERROR:
 				client.leaders[topic.Name][partition.Id] = partition.Leader
 			default:

+ 1 - 1
errors.go

@@ -17,7 +17,7 @@ var IncompleteResponse = errors.New("kafka: Response did not contain all the exp
 // (meaning one outside of the range [0...numPartitions-1]).
 var InvalidPartition = errors.New("kafka: Partitioner returned an invalid partition index.")
 
-// AlreadyConnected is the error returned when calling Connect() on a Broker that is already connected.
+// AlreadyConnected is the error returned when calling Open() on a Broker that is already connected.
 var AlreadyConnected = errors.New("kafka: broker: already connected")
 
 // NotConnected is the error returned when trying to send or call Close() on a Broker that is not connected.