瀏覽代碼

Preparatory steps for the new consumer

Some miscellanious changes I found necessary while developing it, that have been
separated out so they can land first:
- define `type none struct{}` for convenience
- implement `FetchResponse.AddError()`
- prefer the seedBroker when fetching metadata, it makes client behaviour much
  more deterministic and easier to test in multi-broker setups
Evan Huus 11 年之前
父節點
當前提交
75c8106b0b
共有 3 個文件被更改,包括 31 次插入8 次删除
  1. 12 8
      client.go
  2. 17 0
      fetch_response.go
  3. 2 0
      utils.go

+ 12 - 8
client.go

@@ -21,14 +21,14 @@ type ClientConfig struct {
 type Client struct {
 	id     string
 	config ClientConfig
-	closer chan struct{}
+	closer chan none
 
 	// the broker addresses given to us through the constructor are not guaranteed to be returned in
 	// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
 	// so we store them separately
 	seedBrokerAddrs []string
 	seedBroker      *Broker
-	deadBrokerAddrs map[string]struct{}
+	deadBrokerAddrs map[string]none
 
 	brokers  map[int32]*Broker                       // maps broker ids to brokers
 	metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
@@ -56,10 +56,10 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 	client := &Client{
 		id:              id,
 		config:          *config,
-		closer:          make(chan struct{}),
+		closer:          make(chan none),
 		seedBrokerAddrs: addrs,
 		seedBroker:      NewBroker(addrs[0]),
-		deadBrokerAddrs: make(map[string]struct{}),
+		deadBrokerAddrs: make(map[string]none),
 		brokers:         make(map[int32]*Broker),
 		metadata:        make(map[string]map[int32]*PartitionMetadata),
 	}
@@ -309,7 +309,7 @@ func (client *Client) disconnectBroker(broker *Broker) {
 	defer client.lock.Unlock()
 	Logger.Printf("Disconnecting Broker %d\n", broker.ID())
 
-	client.deadBrokerAddrs[broker.addr] = struct{}{}
+	client.deadBrokerAddrs[broker.addr] = none{}
 
 	if broker == client.seedBroker {
 		client.seedBrokerAddrs = client.seedBrokerAddrs[1:]
@@ -397,14 +397,14 @@ func (client *Client) resurrectDeadBrokers() {
 	defer client.lock.Unlock()
 
 	for _, addr := range client.seedBrokerAddrs {
-		client.deadBrokerAddrs[addr] = struct{}{}
+		client.deadBrokerAddrs[addr] = none{}
 	}
 
 	client.seedBrokerAddrs = []string{}
 	for addr := range client.deadBrokerAddrs {
 		client.seedBrokerAddrs = append(client.seedBrokerAddrs, addr)
 	}
-	client.deadBrokerAddrs = make(map[string]struct{})
+	client.deadBrokerAddrs = make(map[string]none)
 
 	client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
 	_ = client.seedBroker.Open(client.config.DefaultBrokerConf)
@@ -414,11 +414,15 @@ func (client *Client) any() *Broker {
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 
+	if client.seedBroker != nil {
+		return client.seedBroker
+	}
+
 	for _, broker := range client.brokers {
 		return broker
 	}
 
-	return client.seedBroker
+	return nil
 }
 
 func (client *Client) cachedLeader(topic string, partitionID int32) (*Broker, error) {

+ 17 - 0
fetch_response.go

@@ -128,6 +128,23 @@ func (fr *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseB
 	return fr.Blocks[topic][partition]
 }
 
+func (fr *FetchResponse) AddError(topic string, partition int32, err KError) {
+	if fr.Blocks == nil {
+		fr.Blocks = make(map[string]map[int32]*FetchResponseBlock)
+	}
+	partitions, ok := fr.Blocks[topic]
+	if !ok {
+		partitions = make(map[int32]*FetchResponseBlock)
+		fr.Blocks[topic] = partitions
+	}
+	frb, ok := partitions[partition]
+	if !ok {
+		frb = new(FetchResponseBlock)
+		partitions[partition] = frb
+	}
+	frb.Err = err
+}
+
 func (fr *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
 	if fr.Blocks == nil {
 		fr.Blocks = make(map[string]map[int32]*FetchResponseBlock)

+ 2 - 0
utils.go

@@ -5,6 +5,8 @@ import (
 	"sort"
 )
 
+type none struct{}
+
 // make []int32 sortable so we can sort partition numbers
 type int32Slice []int32