Browse Source

Rework how the client connects to brokers.

Fixes #9.

This ended up being more complicated than I had hoped and touched several
different areas. TL;DR is that we now connect to the other brokers in the
cluster asynchronously. Errors connecting only show up when somebody tries to
use that broker.

This is better than the old behaviour since it means that if some brokers in a
cluster go down but the topics we care about are still available, we just keep
going instead of blowing up for no reason.

The complicated part is that simply calling `go broker.Connect()` doesn't do
what we want, so I had to write a `broker.AsyncConnect()`. The problem occurs if
you've got code like this:
    go broker.Connect()
    // do some stuff
    broker.SendSomeMessage()
What can happen is that SendSomeMessage can be run before the Connect()
goroutine ever gets scheduled, in which case SendSomeMessage will simply return
NotConnected. The desired behaviour is that SendSomeMessage waits for Connect()
to finish, which means Connect() has to *synchronously* take the broker lock
before it launches the asynchronous connect call. Lots of fun.

And bonus change in this commit: rather than special-casing leader == -1 in
`client.cachedLeader` and adding a big long comment to the LEADER_NOT_AVAILABLE
case explaining the fallthrough statement, just delete that partition from the
hash. So much easier to follow, I must have been on crack when I wrote the old
way.
Evan Huus 12 years ago
parent
commit
a0e96da4d1
2 changed files with 42 additions and 22 deletions
  1. 30 2
      broker.go
  2. 12 20
      client.go

+ 30 - 2
broker.go

@@ -14,6 +14,7 @@ type Broker struct {
 
 	correlation_id int32
 	conn           net.Conn
+	conn_err       error
 	lock           sync.Mutex
 
 	responses chan responsePromise
@@ -27,7 +28,7 @@ type responsePromise struct {
 }
 
 // NewBroker creates and returns a Broker targetting the given host:port address.
-// This does not attempt to actually connect, you have to call Connect() for that.
+// This does not attempt to actually connect, you have to call Connect() or AsyncConnect() for that.
 func NewBroker(host string, port int32) *Broker {
 	b := new(Broker)
 	b.id = -1 // don't know it yet
@@ -40,17 +41,39 @@ func (b *Broker) Connect() error {
 	b.lock.Lock()
 	defer b.lock.Unlock()
 
+	return b.connect()
+}
+
+// AsyncConnect tries to connect to the Broker in a non-blocking way. Calling `broker.AsyncConnect()` is
+// *NOT* the same as calling `go broker.Connect()` - AsyncConnect takes the broker lock synchronously before
+// launching its goroutine, so that subsequent operations on the broker are guaranteed to block waiting for
+// the connection instead of simply returning NotConnected. This does mean that if someone is already operating
+// on the broker, AsyncConnect may not be truly asynchronous while it waits for the lock.
+func (b *Broker) AsyncConnect() {
+	b.lock.Lock()
+
+	go func() {
+		defer b.lock.Unlock()
+		b.connect()
+	}()
+
+}
+
+func (b *Broker) connect() error {
 	if b.conn != nil {
 		return AlreadyConnected
 	}
+	b.conn_err = nil
 
 	addr, err := net.ResolveIPAddr("ip", b.host)
 	if err != nil {
+		b.conn_err = err
 		return err
 	}
 
 	b.conn, err = net.DialTCP("tcp", nil, &net.TCPAddr{IP: addr.IP, Port: int(b.port)})
 	if err != nil {
+		b.conn_err = err
 		return err
 	}
 
@@ -78,6 +101,7 @@ func (b *Broker) Close() error {
 	err := b.conn.Close()
 
 	b.conn = nil
+	b.conn_err = nil
 	b.done = nil
 	b.responses = nil
 
@@ -184,7 +208,11 @@ func (b *Broker) send(clientID string, req requestEncoder, promiseResponse bool)
 	defer b.lock.Unlock()
 
 	if b.conn == nil {
-		return nil, NotConnected
+		if b.conn_err != nil {
+			return nil, b.conn_err
+		} else {
+			return nil, NotConnected
+		}
 	}
 
 	fullRequest := request{b.correlation_id, clientID, req}

+ 12 - 20
client.go

@@ -177,7 +177,7 @@ func (client *Client) cachedLeader(topic string, partition_id int32) *Broker {
 	partitions := client.leaders[topic]
 	if partitions != nil {
 		leader, ok := partitions[partition_id]
-		if ok && leader != -1 {
+		if ok {
 			return client.brokers[leader]
 		}
 	}
@@ -205,34 +205,29 @@ func (client *Client) cachedPartitions(topic string) []int32 {
 
 // if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE
 func (client *Client) update(data *MetadataResponse) ([]string, error) {
+	client.lock.Lock()
+	defer client.lock.Unlock()
+
 	// First discard brokers that we already know about. This avoids bouncing TCP connections,
 	// and especially avoids closing valid connections out from under other code which may be trying
-	// to use them. We only need a read-lock for this.
+	// to use them.
 	var newBrokers []*Broker
-	client.lock.RLock()
 	for _, broker := range data.Brokers {
 		if !broker.Equals(client.brokers[broker.ID()]) {
 			newBrokers = append(newBrokers, broker)
 		}
 	}
-	client.lock.RUnlock()
-
-	// connect to the brokers before taking the write lock, as this can take a while
-	// to timeout if one of them isn't reachable
-	for _, broker := range newBrokers {
-		err := broker.Connect()
-		if err != nil {
-			return nil, err
-		}
-	}
-
-	client.lock.Lock()
-	defer client.lock.Unlock()
 
+	// Now asynchronously try to open connections to the new brokers. We don't care if they
+	// fail, since maybe that broker is unreachable but doesn't have a topic we care about.
+	// If it fails and we do care, whoever tries to use it will get the connection error.
+	// If we have an old broker with that ID (but a different host/port, since they didn't
+	// compare as equals above) then close and remove that broker before saving the new one.
 	for _, broker := range newBrokers {
 		if client.brokers[broker.ID()] != nil {
 			go client.brokers[broker.ID()].Close()
 		}
+		broker.AsyncConnect()
 		client.brokers[broker.ID()] = broker
 	}
 
@@ -251,11 +246,8 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 		for _, partition := range topic.Partitions {
 			switch partition.Err {
 			case LEADER_NOT_AVAILABLE:
-				// in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the
-				// partition is in the middle of leader election, so we fallthrough to save it
-				// anyways in order to avoid returning the stale leader (since -1 isn't a valid broker ID)
 				toRetry[topic.Name] = true
-				fallthrough
+				delete(client.leaders[topic.Name], partition.Id)
 			case NO_ERROR:
 				client.leaders[topic.Name][partition.Id] = partition.Leader
 			default: