Browse Source

Merge pull request #382 from Shopify/simplify-seed-storage

Simplify how we manage seed brokers
Evan Huus 10 years ago
parent
commit
2b8887a442
3 changed files with 75 additions and 40 deletions
  1. 19 36
      client.go
  2. 52 2
      client_test.go
  3. 4 2
      utils.go

+ 19 - 36
client.go

@@ -69,9 +69,8 @@ type client struct {
 	// the broker addresses given to us through the constructor are not guaranteed to be returned in
 	// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
 	// so we store them separately
-	seedBrokerAddrs []string
-	seedBroker      *Broker
-	deadBrokerAddrs map[string]none
+	seedBrokers []*Broker
+	deadSeeds   []*Broker
 
 	brokers  map[int32]*Broker                       // maps broker ids to brokers
 	metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
@@ -103,14 +102,13 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
 	client := &client{
 		conf:                    conf,
 		closer:                  make(chan none),
-		seedBrokerAddrs:         addrs,
-		seedBroker:              NewBroker(addrs[0]),
-		deadBrokerAddrs:         make(map[string]none),
 		brokers:                 make(map[int32]*Broker),
 		metadata:                make(map[string]map[int32]*PartitionMetadata),
 		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
 	}
-	_ = client.seedBroker.Open(conf)
+	for _, addr := range addrs {
+		client.seedBrokers = append(client.seedBrokers, NewBroker(addr))
+	}
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	err := client.RefreshMetadata()
@@ -151,13 +149,14 @@ func (client *client) Close() error {
 	for _, broker := range client.brokers {
 		safeAsyncClose(broker)
 	}
-	client.brokers = nil
-	client.metadata = nil
 
-	if client.seedBroker != nil {
-		safeAsyncClose(client.seedBroker)
+	for _, broker := range client.seedBrokers {
+		safeAsyncClose(broker)
 	}
 
+	client.brokers = nil
+	client.metadata = nil
+
 	close(client.closer)
 
 	return nil
@@ -326,16 +325,9 @@ func (client *client) disconnectBroker(broker *Broker) {
 	client.lock.Lock()
 	defer client.lock.Unlock()
 
-	client.deadBrokerAddrs[broker.addr] = none{}
-
-	if broker == client.seedBroker {
-		client.seedBrokerAddrs = client.seedBrokerAddrs[1:]
-		if len(client.seedBrokerAddrs) > 0 {
-			client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
-			_ = client.seedBroker.Open(client.conf)
-		} else {
-			client.seedBroker = nil
-		}
+	if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] {
+		client.deadSeeds = append(client.deadSeeds, broker)
+		client.seedBrokers = client.seedBrokers[1:]
 	} else {
 		// we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
 		// but we really shouldn't have to; once that loop is made better this case can be
@@ -349,29 +341,20 @@ func (client *client) resurrectDeadBrokers() {
 	client.lock.Lock()
 	defer client.lock.Unlock()
 
-	for _, addr := range client.seedBrokerAddrs {
-		client.deadBrokerAddrs[addr] = none{}
-	}
-
-	client.seedBrokerAddrs = []string{}
-	for addr := range client.deadBrokerAddrs {
-		client.seedBrokerAddrs = append(client.seedBrokerAddrs, addr)
-	}
-	client.deadBrokerAddrs = make(map[string]none)
-
-	client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
-	_ = client.seedBroker.Open(client.conf)
+	client.seedBrokers = append(client.seedBrokers, client.deadSeeds...)
+	client.deadSeeds = nil
 }
 
 func (client *client) any() *Broker {
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 
-	if client.seedBroker != nil {
-		_ = client.seedBroker.Open(client.conf)
-		return client.seedBroker
+	if len(client.seedBrokers) > 0 {
+		_ = client.seedBrokers[0].Open(client.conf)
+		return client.seedBrokers[0]
 	}
 
+	// not guaranteed to be random *or* deterministic
 	for _, broker := range client.brokers {
 		_ = broker.Open(client.conf)
 		return broker

+ 52 - 2
client_test.go

@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"io"
+	"sync"
 	"testing"
 )
 
@@ -300,11 +301,10 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse2)
 
-	c, err := NewClient([]string{seedBroker.Addr()}, nil)
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	client := c.(*client)
 
 	parts, err := client.Partitions("my_topic")
 	if err != nil {
@@ -324,3 +324,53 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	seedBroker.Close()
 	safeClose(t, client)
 }
+
+func TestClientResurrectDeadSeeds(t *testing.T) {
+	seed1 := newMockBroker(t, 1)
+	seed2 := newMockBroker(t, 2)
+	seed3 := newMockBroker(t, 3)
+	addr1 := seed1.Addr()
+	addr2 := seed2.Addr()
+	addr3 := seed3.Addr()
+
+	emptyMetadata := new(MetadataResponse)
+	seed1.Returns(emptyMetadata)
+
+	conf := NewConfig()
+	conf.Metadata.Retry.Backoff = 0
+	c, err := NewClient([]string{addr1, addr2, addr3}, conf)
+	if err != nil {
+		t.Fatal(err)
+	}
+	client := c.(*client)
+
+	wg := sync.WaitGroup{}
+	wg.Add(1)
+	go func() {
+		if err := client.RefreshMetadata(); err != nil {
+			t.Error(err)
+		}
+		wg.Done()
+	}()
+	seed1.Close()
+	seed2.Close()
+
+	seed1 = newMockBrokerAddr(t, 1, addr1)
+	seed2 = newMockBrokerAddr(t, 2, addr2)
+
+	seed3.Close()
+
+	seed1.Close()
+	seed2.Returns(emptyMetadata)
+
+	wg.Wait()
+
+	if len(client.seedBrokers) != 2 {
+		t.Error("incorrect number of live seeds")
+	}
+	if len(client.deadSeeds) != 1 {
+		t.Error("incorrect number of dead seeds")
+	}
+
+	safeClose(t, c)
+}

+ 4 - 2
utils.go

@@ -45,8 +45,10 @@ func withRecover(fn func()) {
 func safeAsyncClose(b *Broker) {
 	tmp := b // local var prevents clobbering in goroutine
 	go withRecover(func() {
-		if err := tmp.Close(); err != nil {
-			Logger.Println("Error closing broker", tmp.ID(), ":", err)
+		if connected, _ := tmp.Connected(); connected {
+			if err := tmp.Close(); err != nil {
+				Logger.Println("Error closing broker", tmp.ID(), ":", err)
+			}
 		}
 	})
 }