소스 검색

Merge pull request #215 from Shopify/consistent-partitioning

Consistent partitioning
Evan Huus 11 년 전
부모
커밋
324253e5ed
9개의 변경된 파일187개의 추가작업 그리고 82개의 파일을 삭제
  1. 73 35
      client.go
  2. 17 7
      client_test.go
  3. 4 4
      consumer_test.go
  4. 0 3
      errors.go
  5. 2 1
      metadata_response.go
  6. 32 11
      partitioner.go
  7. 38 10
      partitioner_test.go
  8. 13 3
      producer.go
  9. 8 8
      producer_test.go

+ 73 - 35
client.go

@@ -115,19 +115,44 @@ func (client *Client) Close() error {
 	return nil
 }
 
-// Partitions returns the sorted list of available partition IDs for the given topic.
+// Partitions returns the sorted list of all partition IDs for the given topic.
 func (client *Client) Partitions(topic string) ([]int32, error) {
 	// Check to see whether the client is closed
 	if client.Closed() {
 		return nil, ClosedClient
 	}
 
-	partitions := client.cachedPartitions(topic)
+	partitions := client.cachedPartitions(topic, allPartitions)
+
+	if len(partitions) == 0 {
+		err := client.RefreshTopicMetadata(topic)
+		if err != nil {
+			return nil, err
+		}
+		partitions = client.cachedPartitions(topic, allPartitions)
+	}
+
+	if partitions == nil {
+		return nil, UnknownTopicOrPartition
+	}
+
+	return partitions, nil
+}
+
+// WritablePartitions returns the sorted list of all writable partition IDs for the given topic,
+// where "writable" means "having a valid leader accepting writes".
+func (client *Client) WritablePartitions(topic string) ([]int32, error) {
+	// Check to see whether the client is closed
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
+	partitions := client.cachedPartitions(topic, writablePartitions)
 
 	// len==0 catches when it's nil (no such topic) and the odd case when every single
 	// partition is undergoing leader election simultaneously. Callers have to be able to handle
 	// this function returning an empty slice (which is a valid return value) but catching it
-	// here the first time (note we *don't* catch it below where we return NoSuchTopic) triggers
+	// here the first time (note we *don't* catch it below where we return UnknownTopicOrPartition) triggers
 	// a metadata refresh as a nicety so callers can just try again and don't have to manually
 	// trigger a refresh (otherwise they'd just keep getting a stale cached copy).
 	if len(partitions) == 0 {
@@ -135,11 +160,11 @@ func (client *Client) Partitions(topic string) ([]int32, error) {
 		if err != nil {
 			return nil, err
 		}
-		partitions = client.cachedPartitions(topic)
+		partitions = client.cachedPartitions(topic, writablePartitions)
 	}
 
 	if partitions == nil {
-		return nil, NoSuchTopic
+		return nil, UnknownTopicOrPartition
 	}
 
 	return partitions, nil
@@ -182,43 +207,53 @@ func (client *Client) getMetadata(topic string, partitionID int32) (*PartitionMe
 }
 
 func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error) {
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
 	metadata, err := client.getMetadata(topic, partitionID)
 
 	if err != nil {
 		return nil, err
 	}
 
+	if metadata.Err == ReplicaNotAvailable {
+		return nil, metadata.Err
+	}
 	return dupeAndSort(metadata.Replicas), nil
 }
 
 func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32, error) {
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
 	metadata, err := client.getMetadata(topic, partitionID)
 
 	if err != nil {
 		return nil, err
 	}
 
+	if metadata.Err == ReplicaNotAvailable {
+		return nil, metadata.Err
+	}
 	return dupeAndSort(metadata.Isr), nil
 }
 
 // Leader returns the broker object that is the leader of the current topic/partition, as
 // determined by querying the cluster metadata.
 func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
-	leader := client.cachedLeader(topic, partitionID)
+	leader, err := client.cachedLeader(topic, partitionID)
 
 	if leader == nil {
 		err := client.RefreshTopicMetadata(topic)
 		if err != nil {
 			return nil, err
 		}
-		leader = client.cachedLeader(topic, partitionID)
+		leader, err = client.cachedLeader(topic, partitionID)
 	}
 
-	if leader == nil {
-		return nil, UnknownTopicOrPartition
-	}
-
-	return leader, nil
+	return leader, err
 }
 
 // RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
@@ -310,7 +345,7 @@ func (client *Client) refreshMetadata(topics []string, retries int) error {
 	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
 	for _, topic := range topics {
 		if len(topic) == 0 {
-			return NoSuchTopic
+			return UnknownTopicOrPartition
 		}
 	}
 
@@ -386,7 +421,7 @@ func (client *Client) any() *Broker {
 	return client.seedBroker
 }
 
-func (client *Client) cachedLeader(topic string, partitionID int32) *Broker {
+func (client *Client) cachedLeader(topic string, partitionID int32) (*Broker, error) {
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 
@@ -394,11 +429,14 @@ func (client *Client) cachedLeader(topic string, partitionID int32) *Broker {
 	if partitions != nil {
 		metadata, ok := partitions[partitionID]
 		if ok {
-			return client.brokers[metadata.Leader]
+			if metadata.Err == LeaderNotAvailable {
+				return nil, metadata.Err
+			}
+			return client.brokers[metadata.Leader], nil
 		}
 	}
 
-	return nil
+	return nil, UnknownTopicOrPartition
 }
 
 func (client *Client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
@@ -413,7 +451,12 @@ func (client *Client) cachedMetadata(topic string, partitionID int32) *Partition
 	return nil
 }
 
-func (client *Client) cachedPartitions(topic string) []int32 {
+const (
+	allPartitions = iota
+	writablePartitions
+)
+
+func (client *Client) cachedPartitions(topic string, partitionSet int) []int32 {
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 
@@ -423,8 +466,11 @@ func (client *Client) cachedPartitions(topic string) []int32 {
 	}
 
 	ret := make([]int32, 0, len(partitions))
-	for id := range partitions {
-		ret = append(ret, id)
+	for _, partition := range partitions {
+		if partitionSet == writablePartitions && partition.Err == LeaderNotAvailable {
+			continue
+		}
+		ret = append(ret, partition.ID)
 	}
 
 	sort.Sort(int32Slice(ret))
@@ -479,32 +525,24 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 
 	var err error
 	for _, topic := range data.Topics {
-		switch topic.Err {
-		case NoError:
-			break
-		case LeaderNotAvailable:
-			toRetry[topic.Name] = true
-		default:
+		if topic.Err != NoError {
 			err = topic.Err
+			continue
 		}
+
 		client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
 		for _, partition := range topic.Partitions {
+			client.metadata[topic.Name][partition.ID] = partition
 			switch partition.Err {
 			case NoError:
 				broker := client.brokers[partition.Leader]
 				if _, present := client.deadBrokerAddrs[broker.Addr()]; present {
 					if connected, _ := broker.Connected(); !connected {
 						toRetry[topic.Name] = true
-						delete(client.metadata[topic.Name], partition.ID)
-						continue
 					}
 				}
-				client.metadata[topic.Name][partition.ID] = partition
 			case LeaderNotAvailable:
 				toRetry[topic.Name] = true
-				delete(client.metadata[topic.Name], partition.ID)
-			default:
-				err = partition.Err
 			}
 		}
 	}
@@ -532,12 +570,12 @@ func NewClientConfig() *ClientConfig {
 // Validate checks a ClientConfig instance. This will return a
 // ConfigurationError if the specified values don't make sense.
 func (config *ClientConfig) Validate() error {
-	if config.MetadataRetries <= 0 {
-		return ConfigurationError("Invalid MetadataRetries. Try 10")
+	if config.MetadataRetries < 0 {
+		return ConfigurationError("Invalid MetadataRetries, must be >= 0")
 	}
 
 	if config.WaitForElection <= time.Duration(0) {
-		return ConfigurationError("Invalid WaitForElection. Try 250*time.Millisecond")
+		return ConfigurationError("Invalid WaitForElection, must be > 0")
 	}
 
 	if config.DefaultBrokerConf != nil {
@@ -547,7 +585,7 @@ func (config *ClientConfig) Validate() error {
 	}
 
 	if config.BackgroundRefreshFrequency < time.Duration(0) {
-		return ConfigurationError("Invalid BackgroundRefreshFrequency.")
+		return ConfigurationError("Invalid BackgroundRefreshFrequency, must be >= 0")
 	}
 
 	return nil

+ 17 - 7
client_test.go

@@ -60,10 +60,13 @@ func TestClientMetadata(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID(), replicas, isr)
+	mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID(), replicas, isr, NoError)
+	mdr.AddTopicPartition("my_topic", 1, mb5.BrokerID(), replicas, isr, LeaderNotAvailable)
 	mb1.Returns(mdr)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	config := NewClientConfig()
+	config.MetadataRetries = 0
+	client, err := NewClient("client_id", []string{mb1.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -81,10 +84,17 @@ func TestClientMetadata(t *testing.T) {
 	parts, err := client.Partitions("my_topic")
 	if err != nil {
 		t.Error(err)
-	} else if len(parts) != 1 || parts[0] != 0 {
+	} else if len(parts) != 2 || parts[0] != 0 || parts[1] != 1 {
 		t.Error("Client returned incorrect partitions for my_topic:", parts)
 	}
 
+	parts, err = client.WritablePartitions("my_topic")
+	if err != nil {
+		t.Error(err)
+	} else if len(parts) != 1 || parts[0] != 0 {
+		t.Error("Client returned incorrect writable partitions for my_topic:", parts)
+	}
+
 	tst, err := client.Leader("my_topic", 0)
 	if err != nil {
 		t.Error(err)
@@ -122,16 +132,13 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	mb1.Returns(mdr)
 
 	mdr2 := new(MetadataResponse)
-	mdr2.AddTopicPartition("my_topic", 0xb, mb5.BrokerID(), nil, nil)
+	mdr2.AddTopicPartition("my_topic", 0xb, mb5.BrokerID(), nil, nil, NoError)
 	mb5.Returns(mdr2)
 
 	client, err := NewClient("clientID", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, client)
-	defer mb1.Close()
-	defer mb5.Close()
 
 	parts, err := client.Partitions("my_topic")
 	if err != nil {
@@ -148,4 +155,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	}
 
 	client.disconnectBroker(tst)
+	mb5.Close()
+	mb1.Close()
+	safeClose(t, client)
 }

+ 4 - 4
consumer_test.go

@@ -19,7 +19,7 @@ func TestSimpleConsumer(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	mb1.Returns(mdr)
 
 	for i := 0; i < 10; i++ {
@@ -62,7 +62,7 @@ func TestConsumerRawOffset(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	mb1.Returns(mdr)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
@@ -95,7 +95,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	mb1.Returns(mdr)
 
 	or := new(OffsetResponse)
@@ -130,7 +130,7 @@ func TestConsumerPrelude(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	mb1.Returns(mdr)
 
 	fr := new(FetchResponse)

+ 0 - 3
errors.go

@@ -12,9 +12,6 @@ var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to
 // ClosedClient is the error returned when a method is called on a client that has been closed.
 var ClosedClient = errors.New("kafka: Tried to use a client that was closed.")
 
-// NoSuchTopic is the error returned when the supplied topic is rejected by the Kafka servers.
-var NoSuchTopic = errors.New("kafka: Topic not recognized by brokers.")
-
 // IncompleteResponse is the error returned when the server returns a syntactically valid response, but it does
 // not contain the expected information.
 var IncompleteResponse = errors.New("kafka: Response did not contain all the expected topic/partition blocks.")

+ 2 - 1
metadata_response.go

@@ -182,7 +182,7 @@ func (m *MetadataResponse) AddBroker(addr string, id int32) {
 	m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr})
 }
 
-func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32) {
+func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
 	var match *TopicMetadata
 
 	for _, tm := range m.Topics {
@@ -216,5 +216,6 @@ foundPartition:
 	pmatch.Leader = brokerID
 	pmatch.Replicas = replicas
 	pmatch.Isr = isr
+	pmatch.Err = err
 
 }

+ 32 - 11
partitioner.go

@@ -11,7 +11,12 @@ import (
 // decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
 // as simple default implementations.
 type Partitioner interface {
-	Partition(key Encoder, numPartitions int32) int32
+	Partition(key Encoder, numPartitions int32) (int32, error) // Partition takes the key and partition count and chooses a partition
+
+	// RequiresConsistency indicates to the user of the partitioner whether the mapping of key->partition is consistent or not.
+	// Specifically, if a partitioner requires consistency then it must be allowed to choose from all partitions (even ones known to
+	// be unavailable), and its choice must be respected by the caller. The obvious example is the HashPartitioner.
+	RequiresConsistency() bool
 }
 
 // PartitionerConstructor is the type for a function capable of constructing new Partitioners.
@@ -28,8 +33,12 @@ func NewRandomPartitioner() Partitioner {
 	return p
 }
 
-func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32 {
-	return int32(p.generator.Intn(int(numPartitions)))
+func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
+	return int32(p.generator.Intn(int(numPartitions))), nil
+}
+
+func (p *RandomPartitioner) RequiresConsistency() bool {
+	return false
 }
 
 // RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
@@ -41,13 +50,17 @@ func NewRoundRobinPartitioner() Partitioner {
 	return &RoundRobinPartitioner{}
 }
 
-func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int32 {
+func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
 	if p.partition >= numPartitions {
 		p.partition = 0
 	}
 	ret := p.partition
 	p.partition++
-	return ret
+	return ret, nil
+}
+
+func (p *RoundRobinPartitioner) RequiresConsistency() bool {
+	return false
 }
 
 // HashPartitioner implements the Partitioner interface. If the key is nil, or fails to encode, then a random partition
@@ -65,24 +78,28 @@ func NewHashPartitioner() Partitioner {
 	return p
 }
 
-func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32 {
+func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
 	if key == nil {
 		return p.random.Partition(key, numPartitions)
 	}
 	bytes, err := key.Encode()
 	if err != nil {
-		return p.random.Partition(key, numPartitions)
+		return -1, err
 	}
 	p.hasher.Reset()
 	_, err = p.hasher.Write(bytes)
 	if err != nil {
-		return p.random.Partition(key, numPartitions)
+		return -1, err
 	}
 	hash := int32(p.hasher.Sum32())
 	if hash < 0 {
 		hash = -hash
 	}
-	return hash % numPartitions
+	return hash % numPartitions, nil
+}
+
+func (p *HashPartitioner) RequiresConsistency() bool {
+	return true
 }
 
 // ConstantPartitioner implements the Partitioner interface by just returning a constant value.
@@ -90,6 +107,10 @@ type ConstantPartitioner struct {
 	Constant int32
 }
 
-func (p *ConstantPartitioner) Partition(key Encoder, numPartitions int32) int32 {
-	return p.Constant
+func (p *ConstantPartitioner) Partition(key Encoder, numPartitions int32) (int32, error) {
+	return p.Constant, nil
+}
+
+func (p *ConstantPartitioner) RequiresConsistency() bool {
+	return true
 }

+ 38 - 10
partitioner_test.go

@@ -6,12 +6,18 @@ import (
 )
 
 func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, key Encoder, numPartitions int32) {
-	choice := partitioner.Partition(key, numPartitions)
+	choice, err := partitioner.Partition(key, numPartitions)
+	if err != nil {
+		t.Error(partitioner, err)
+	}
 	if choice < 0 || choice >= numPartitions {
 		t.Error(partitioner, "returned partition", choice, "outside of range for", key)
 	}
 	for i := 1; i < 50; i++ {
-		newChoice := partitioner.Partition(key, numPartitions)
+		newChoice, err := partitioner.Partition(key, numPartitions)
+		if err != nil {
+			t.Error(partitioner, err)
+		}
 		if newChoice != choice {
 			t.Error(partitioner, "returned partition", newChoice, "inconsistent with", choice, ".")
 		}
@@ -21,13 +27,19 @@ func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, key Enc
 func TestRandomPartitioner(t *testing.T) {
 	partitioner := NewRandomPartitioner()
 
-	choice := partitioner.Partition(nil, 1)
+	choice, err := partitioner.Partition(nil, 1)
+	if err != nil {
+		t.Error(partitioner, err)
+	}
 	if choice != 0 {
 		t.Error("Returned non-zero partition when only one available.")
 	}
 
 	for i := 1; i < 50; i++ {
-		choice := partitioner.Partition(nil, 50)
+		choice, err := partitioner.Partition(nil, 50)
+		if err != nil {
+			t.Error(partitioner, err)
+		}
 		if choice < 0 || choice >= 50 {
 			t.Error("Returned partition", choice, "outside of range.")
 		}
@@ -37,14 +49,20 @@ func TestRandomPartitioner(t *testing.T) {
 func TestRoundRobinPartitioner(t *testing.T) {
 	partitioner := RoundRobinPartitioner{}
 
-	choice := partitioner.Partition(nil, 1)
+	choice, err := partitioner.Partition(nil, 1)
+	if err != nil {
+		t.Error(partitioner, err)
+	}
 	if choice != 0 {
 		t.Error("Returned non-zero partition when only one available.")
 	}
 
 	var i int32
 	for i = 1; i < 50; i++ {
-		choice := partitioner.Partition(nil, 7)
+		choice, err := partitioner.Partition(nil, 7)
+		if err != nil {
+			t.Error(partitioner, err)
+		}
 		if choice != i%7 {
 			t.Error("Returned partition", choice, "expecting", i%7)
 		}
@@ -54,13 +72,19 @@ func TestRoundRobinPartitioner(t *testing.T) {
 func TestHashPartitioner(t *testing.T) {
 	partitioner := NewHashPartitioner()
 
-	choice := partitioner.Partition(nil, 1)
+	choice, err := partitioner.Partition(nil, 1)
+	if err != nil {
+		t.Error(partitioner, err)
+	}
 	if choice != 0 {
 		t.Error("Returned non-zero partition when only one available.")
 	}
 
 	for i := 1; i < 50; i++ {
-		choice := partitioner.Partition(nil, 50)
+		choice, err := partitioner.Partition(nil, 50)
+		if err != nil {
+			t.Error(partitioner, err)
+		}
 		if choice < 0 || choice >= 50 {
 			t.Error("Returned partition", choice, "outside of range for nil key.")
 		}
@@ -74,10 +98,14 @@ func TestHashPartitioner(t *testing.T) {
 }
 
 func TestConstantPartitioner(t *testing.T) {
-	partitioner := &ConstantPartitioner{Constant: 0}
+	var partitioner Partitioner
+	partitioner = &ConstantPartitioner{Constant: 0}
 
 	for i := 1; i < 50; i++ {
-		choice := partitioner.Partition(nil, 50)
+		choice, err := partitioner.Partition(nil, 50)
+		if err != nil {
+			t.Error(partitioner, err)
+		}
 		if choice != 0 {
 			t.Error("Returned partition", choice, "instead of 0.")
 		}

+ 13 - 3
producer.go

@@ -617,7 +617,15 @@ func (p *Producer) retryHandler() {
 // utility functions
 
 func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend) error {
-	partitions, err := p.client.Partitions(msg.Topic)
+	var partitions []int32
+	var err error
+
+	if partitioner.RequiresConsistency() {
+		partitions, err = p.client.Partitions(msg.Topic)
+	} else {
+		partitions, err = p.client.WritablePartitions(msg.Topic)
+	}
+
 	if err != nil {
 		return err
 	}
@@ -628,9 +636,11 @@ func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend)
 		return LeaderNotAvailable
 	}
 
-	choice := partitioner.Partition(msg.Key, numPartitions)
+	choice, err := partitioner.Partition(msg.Key, numPartitions)
 
-	if choice < 0 || choice >= numPartitions {
+	if err != nil {
+		return err
+	} else if choice < 0 || choice >= numPartitions {
 		return InvalidPartition
 	}
 

+ 8 - 8
producer_test.go

@@ -21,7 +21,7 @@ func TestSimpleProducer(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	response1.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -59,7 +59,7 @@ func TestConcurrentSimpleProducer(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	response1.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -105,7 +105,7 @@ func TestProducer(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -153,7 +153,7 @@ func TestProducerMultipleFlushes(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -208,8 +208,8 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
 	response1.AddBroker(broker3.Addr(), broker3.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
-	response1.AddTopicPartition("my_topic", 1, broker3.BrokerID(), nil, nil)
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
+	response1.AddTopicPartition("my_topic", 1, broker3.BrokerID(), nil, nil, NoError)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -261,7 +261,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
 	broker1.Returns(response1)
 
 	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
@@ -287,7 +287,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 	response3 := new(MetadataResponse)
 	response3.AddBroker(broker3.Addr(), broker3.BrokerID())
-	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil)
+	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil, NoError)
 	broker2.Returns(response3)
 
 	response4 := new(ProduceResponse)