Przeglądaj źródła

Add Client.Coordinator(topic) and Client.RefreshCoordinator(topic) to retrieve the coordinating broker for a consumer group.

Willem van Bergen 10 lat temu
rodzic
commit
95649f54ca
4 zmienionych plików z 308 dodań i 14 usunięć
  1. 137 14
      client.go
  2. 132 0
      client_test.go
  3. 15 0
      consumer_metadata_response.go
  4. 24 0
      functional_client_test.go

+ 137 - 14
client.go

@@ -42,6 +42,13 @@ type Client interface {
 	// offset, OffsetNewest for the offset of the message that will be produced next, or a time.
 	GetOffset(topic string, partitionID int32, time int64) (int64, error)
 
+	// Coordinator returns the coordinating broker for a consumer group. It will return a locally cached
+	// value if it's available. You can call RefreshCoordinator to update the cached value.
+	Coordinator(consumerGroup string) (*Broker, error)
+
+	// RefreshCoordinator retrieves the coordinator for a consumer group and stores it in local cache.
+	RefreshCoordinator(consumerGroup string) error
+
 	// Close shuts down all broker connections managed by this client. It is required to call this function before
 	// a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
 	// using a client before you close the client.
@@ -72,13 +79,15 @@ type client struct {
 	seedBrokers []*Broker
 	deadSeeds   []*Broker
 
-	brokers  map[int32]*Broker                       // maps broker ids to brokers
-	metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
+	brokers      map[int32]*Broker                       // maps broker ids to brokers
+	metadata     map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
+	coordinators map[string]int32                        // Maps consumer group names to coordinating broker IDs
 
 	// If the number of partitions is large, we can get some churn calling cachedPartitions,
 	// so the result is cached.  It is important to update this value whenever metadata is changed
 	cachedPartitionsResults map[string][maxPartitionIndex][]int32
-	lock                    sync.RWMutex // protects access to the maps, only one since they're always written together
+
+	lock sync.RWMutex // protects access to the maps that hold cluster state.
 }
 
 // NewClient creates a new Client. It connects to one of the given broker addresses
@@ -105,6 +114,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
 		brokers:                 make(map[int32]*Broker),
 		metadata:                make(map[string]map[int32]*PartitionMetadata),
 		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
+		coordinators:            make(map[string]int32),
 	}
 	for _, addr := range addrs {
 		client.seedBrokers = append(client.seedBrokers, NewBroker(addr))
@@ -304,9 +314,56 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in
 	return offset, err
 }
 
+func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
+	coordinator := client.cachedCoordinator(consumerGroup)
+
+	if coordinator == nil {
+		if err := client.RefreshCoordinator(consumerGroup); err != nil {
+			return nil, err
+		}
+		coordinator = client.cachedCoordinator(consumerGroup)
+	}
+
+	if coordinator == nil {
+		return nil, ErrConsumerCoordinatorNotAvailable
+	}
+
+	_ = coordinator.Open(client.conf)
+	return coordinator, nil
+}
+
+func (client *client) RefreshCoordinator(consumerGroup string) error {
+	response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max)
+	if err != nil {
+		return err
+	}
+
+	client.lock.Lock()
+	defer client.lock.Unlock()
+	client.registerBroker(response.Coordinator)
+	client.coordinators[consumerGroup] = response.Coordinator.ID()
+	return nil
+}
+
 // private broker management helpers
 
-func (client *client) disconnectBroker(broker *Broker) {
+// registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
+// in the brokers map. It returns the broker that is registered, which may be the provided broker,
+// or a previously registered Broker instance. You must hold the write lock before calling this function.
+func (client *client) registerBroker(broker *Broker) {
+	if client.brokers[broker.ID()] == nil {
+		client.brokers[broker.ID()] = broker
+		Logger.Printf("client/brokers Registered new broker #%d at %s", broker.ID(), broker.Addr())
+	} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
+		safeAsyncClose(client.brokers[broker.ID()])
+		client.brokers[broker.ID()] = broker
+		Logger.Printf("client/brokers Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
+	}
+}
+
+// deregisterBroker removes a broker from the seedsBroker list, and if it's
+// not the seedbroker, removes it from brokers map completely.
+func (client *client) deregisterBroker(broker *Broker) {
 	client.lock.Lock()
 	defer client.lock.Unlock()
 
@@ -316,8 +373,9 @@ func (client *client) disconnectBroker(broker *Broker) {
 	} else {
 		// we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
 		// but we really shouldn't have to; once that loop is made better this case can be
-		// removed, and the function generally can be renamed from `disconnectBroker` to
+		// removed, and the function generally can be renamed from `deregisterBroker` to
 		// `nextSeedBroker` or something
+		Logger.Printf("client/brokers Deregistered broker #%d at %s", broker.ID(), broker.Addr())
 		delete(client.brokers, broker.ID())
 	}
 }
@@ -511,7 +569,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
 			// some other error, remove that broker and try again
 			Logger.Println("Error from broker while fetching metadata:", err)
 			_ = broker.Close()
-			client.disconnectBroker(broker)
+			client.deregisterBroker(broker)
 		}
 	}
 
@@ -538,14 +596,7 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
 	// - if it is an existing ID, but the address we have is stale, discard the old one and save it
 	// - otherwise ignore it, replacing our existing one would just bounce the connection
 	for _, broker := range data.Brokers {
-		if client.brokers[broker.ID()] == nil {
-			client.brokers[broker.ID()] = broker
-			Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
-		} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
-			safeAsyncClose(client.brokers[broker.ID()])
-			client.brokers[broker.ID()] = broker
-			Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
-		}
+		client.registerBroker(broker)
 	}
 
 	toRetry := make(map[string]bool)
@@ -595,3 +646,75 @@ func (client *client) updateMetadata(data *MetadataResponse) ([]string, error) {
 	}
 	return ret, err
 }
+
+func (client *client) cachedCoordinator(consumerGroup string) *Broker {
+	client.lock.RLock()
+	defer client.lock.RUnlock()
+	if coordinatorID, ok := client.coordinators[consumerGroup]; !ok {
+		return nil
+	} else {
+		return client.brokers[coordinatorID]
+	}
+}
+
+func (client *client) getConsumerMetadata(consumerGroup string, retriesRemaining int) (*ConsumerMetadataResponse, error) {
+	for broker := client.any(); broker != nil; broker = client.any() {
+		Logger.Printf("client/coordinator Requesting coordinator for consumergoup %s from %s.\n", consumerGroup, broker.Addr())
+
+		request := new(ConsumerMetadataRequest)
+		request.ConsumerGroup = consumerGroup
+
+		response, err := broker.GetConsumerMetadata(request)
+
+		if err != nil {
+			Logger.Printf("client/coordinator Request to broker %s failed: %s.\n", broker.Addr(), err)
+
+			switch err.(type) {
+			case PacketEncodingError:
+				return nil, err
+			default:
+				_ = broker.Close()
+				client.deregisterBroker(broker)
+				continue
+			}
+		}
+
+		switch response.Err {
+		case ErrNoError:
+			Logger.Printf("client/coordinator Coordinator for consumergoup %s is #%d (%s:%d).\n", consumerGroup, response.CoordinatorID, response.CoordinatorHost, response.CoordinatorPort)
+			return response, nil
+
+		case ErrConsumerCoordinatorNotAvailable:
+			Logger.Printf("client/coordinator Coordinator for consumer group %s is not available.\n", consumerGroup)
+
+			// This is very ugly, but this scenario will only happen once per cluster.
+			// The __consumer_offsets topic only has to be created one time.
+			// The number of partitions not configurable, but partition 0 should always exist.
+			if _, err := client.Leader("__consumer_offsets", 0); err != nil {
+				Logger.Printf("client/coordinator The __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n")
+				time.Sleep(2 * time.Second)
+			}
+
+			if retriesRemaining > 0 {
+				Logger.Printf("Retrying after %dms... (%d retries remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
+				time.Sleep(client.conf.Metadata.Retry.Backoff)
+				return client.getConsumerMetadata(consumerGroup, retriesRemaining-1)
+			}
+			return nil, ErrConsumerCoordinatorNotAvailable
+
+		default:
+			return nil, response.Err
+		}
+	}
+
+	Logger.Println("Out of available brokers to request consumer metadata from.")
+
+	if retriesRemaining > 0 {
+		Logger.Printf("Resurrecting dead brokers after %dms... (%d retries remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, retriesRemaining)
+		time.Sleep(client.conf.Metadata.Retry.Backoff)
+		client.resurrectDeadBrokers()
+		return client.getConsumerMetadata(consumerGroup, retriesRemaining-1)
+	}
+
+	return nil, ErrOutOfBrokers
+}

+ 132 - 0
client_test.go

@@ -422,3 +422,135 @@ func TestClientResurrectDeadSeeds(t *testing.T) {
 
 	safeClose(t, c)
 }
+
+func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	staleCoordinator := newMockBroker(t, 2)
+	freshCoordinator := newMockBroker(t, 3)
+
+	replicas := []int32{staleCoordinator.BrokerID(), freshCoordinator.BrokerID()}
+	metadataResponse1 := new(MetadataResponse)
+	metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID())
+	metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID())
+	metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError)
+	seedBroker.Returns(metadataResponse1)
+
+	client, err := NewClient([]string{seedBroker.Addr()}, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	coordinatorResponse1 := new(ConsumerMetadataResponse)
+	coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
+	seedBroker.Returns(coordinatorResponse1)
+
+	coordinatorResponse2 := new(ConsumerMetadataResponse)
+	coordinatorResponse2.CoordinatorID = staleCoordinator.BrokerID()
+	coordinatorResponse2.CoordinatorHost = "127.0.0.1"
+	coordinatorResponse2.CoordinatorPort = staleCoordinator.Port()
+
+	seedBroker.Returns(coordinatorResponse2)
+
+	broker, err := client.Coordinator("my_group")
+	if err != nil {
+		t.Error(err)
+	}
+
+	if staleCoordinator.Addr() != broker.Addr() {
+		t.Errorf("Expected coordinator to have address %s, found %s", staleCoordinator.Addr(), broker.Addr())
+	}
+
+	if staleCoordinator.BrokerID() != broker.ID() {
+		t.Errorf("Expected coordinator to have ID %d, found %d", staleCoordinator.BrokerID(), broker.ID())
+	}
+
+	// Grab the cached value
+	broker2, err := client.Coordinator("my_group")
+	if err != nil {
+		t.Error(err)
+	}
+
+	if broker2.Addr() != broker.Addr() {
+		t.Errorf("Expected the coordinator to be the same, but found %s vs. %s", broker2.Addr(), broker.Addr())
+	}
+
+	coordinatorResponse3 := new(ConsumerMetadataResponse)
+	coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
+	coordinatorResponse3.CoordinatorHost = "127.0.0.1"
+	coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()
+
+	seedBroker.Returns(coordinatorResponse3)
+
+	// Refresh the locally cahced value because it's stale
+	if err := client.RefreshCoordinator("my_group"); err != nil {
+		t.Error(err)
+	}
+
+	// Grab the fresh value
+	broker3, err := client.Coordinator("my_group")
+	if err != nil {
+		t.Error(err)
+	}
+
+	if broker3.Addr() != freshCoordinator.Addr() {
+		t.Errorf("Expected the freshCoordinator to be returned, but found %s.", broker3.Addr())
+	}
+
+	freshCoordinator.Close()
+	staleCoordinator.Close()
+	seedBroker.Close()
+	safeClose(t, client)
+}
+
+func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
+	seedBroker := newMockBroker(t, 1)
+	coordinator := newMockBroker(t, 2)
+
+	metadataResponse1 := new(MetadataResponse)
+	seedBroker.Returns(metadataResponse1)
+
+	config := NewConfig()
+	config.Metadata.Retry.Max = 1
+	config.Metadata.Retry.Backoff = 0
+	client, err := NewClient([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	coordinatorResponse1 := new(ConsumerMetadataResponse)
+	coordinatorResponse1.Err = ErrConsumerCoordinatorNotAvailable
+	seedBroker.Returns(coordinatorResponse1)
+
+	metadataResponse2 := new(MetadataResponse)
+	metadataResponse2.AddTopic("__consumer_offsets", ErrUnknownTopicOrPartition)
+	seedBroker.Returns(metadataResponse2)
+
+	replicas := []int32{coordinator.BrokerID()}
+	metadataResponse3 := new(MetadataResponse)
+	metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError)
+	seedBroker.Returns(metadataResponse3)
+
+	coordinatorResponse2 := new(ConsumerMetadataResponse)
+	coordinatorResponse2.CoordinatorID = coordinator.BrokerID()
+	coordinatorResponse2.CoordinatorHost = "127.0.0.1"
+	coordinatorResponse2.CoordinatorPort = coordinator.Port()
+
+	seedBroker.Returns(coordinatorResponse2)
+
+	broker, err := client.Coordinator("my_group")
+	if err != nil {
+		t.Error(err)
+	}
+
+	if coordinator.Addr() != broker.Addr() {
+		t.Errorf("Expected coordinator to have address %s, found %s", coordinator.Addr(), broker.Addr())
+	}
+
+	if coordinator.BrokerID() != broker.ID() {
+		t.Errorf("Expected coordinator to have ID %d, found %d", coordinator.BrokerID(), broker.ID())
+	}
+
+	coordinator.Close()
+	seedBroker.Close()
+	safeClose(t, client)
+}

+ 15 - 0
consumer_metadata_response.go

@@ -41,3 +41,18 @@ func (r *ConsumerMetadataResponse) decode(pd packetDecoder) (err error) {
 
 	return nil
 }
+
+func (r *ConsumerMetadataResponse) encode(pe packetEncoder) error {
+
+	pe.putInt16(int16(r.Err))
+
+	pe.putInt32(r.CoordinatorID)
+
+	if err := pe.putString(r.CoordinatorHost); err != nil {
+		return err
+	}
+
+	pe.putInt32(r.CoordinatorPort)
+
+	return nil
+}

+ 24 - 0
functional_client_test.go

@@ -1,6 +1,7 @@
 package sarama
 
 import (
+	"fmt"
 	"testing"
 	"time"
 )
@@ -56,3 +57,26 @@ func TestFuncClientMetadata(t *testing.T) {
 
 	safeClose(t, client)
 }
+
+func TestFuncClientCoordinator(t *testing.T) {
+	checkKafkaVersion(t, "0.8.2")
+	checkKafkaAvailability(t)
+
+	client, err := NewClient(kafkaBrokers, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 10; i++ {
+		broker, err := client.Coordinator(fmt.Sprintf("another_new_consumer_group_%d", i))
+		if err != nil {
+			t.Error(err)
+		}
+
+		if connected, err := broker.Connected(); !connected || err != nil {
+			t.Errorf("Expected to coordinator %s broker to be properly connected.", broker.Addr())
+		}
+	}
+
+	safeClose(t, client)
+}