Prechádzať zdrojové kódy

added offline replicas operation to client for kafka version >= 1.x

drsoares 5 rokov pred
rodič
commit
e3ecf7b088
10 zmenil súbory, kde vykonal 219 pridanie a 59 odobranie
  1. 3 1
      admin.go
  2. 38 0
      admin_test.go
  3. 29 29
      async_producer_test.go
  4. 32 2
      client.go
  5. 101 13
      client_test.go
  6. 3 2
      metadata_request.go
  7. 2 1
      metadata_response.go
  8. 3 3
      mockresponses.go
  9. 2 2
      offset_manager_test.go
  10. 6 6
      sync_producer_test.go

+ 3 - 1
admin.go

@@ -183,7 +183,9 @@ func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetada
 		AllowAutoTopicCreation: false,
 	}
 
-	if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+	if ca.conf.Version.IsAtLeast(V1_0_0_0) {
+		request.Version = 5
+	} else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
 		request.Version = 4
 	}
 

+ 38 - 0
admin_test.go

@@ -620,6 +620,44 @@ func TestDescribeTopic(t *testing.T) {
 	}
 }
 
+func TestDescribeTopicWithVersion0_11(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetLeader("my_topic", 0, seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+	})
+
+	config := NewConfig()
+	config.Version = V0_11_0_0
+
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	topics, err := admin.DescribeTopics([]string{"my_topic"})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(topics) != 1 {
+		t.Fatalf("Expected 1 result, got %v", len(topics))
+	}
+
+	if topics[0].Name != "my_topic" {
+		t.Fatalf("Incorrect topic name: %v", topics[0].Name)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
 func TestDescribeConsumerGroup(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	defer seedBroker.Close()

+ 29 - 29
async_producer_test.go

@@ -100,7 +100,7 @@ func TestAsyncProducer(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
@@ -149,7 +149,7 @@ func TestAsyncProducerMultipleFlushes(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
@@ -186,8 +186,8 @@ func TestAsyncProducerMultipleBrokers(t *testing.T) {
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
 	metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodResponse0 := new(ProduceResponse)
@@ -224,7 +224,7 @@ func TestAsyncProducerCustomPartitioner(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodResponse := new(ProduceResponse)
@@ -267,7 +267,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
 	config := NewConfig()
@@ -289,7 +289,7 @@ func TestAsyncProducerFailureRetry(t *testing.T) {
 
 	metadataLeader2 := new(MetadataResponse)
 	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
 	leader1.Returns(metadataLeader2)
 
 	prodSuccess := new(ProduceResponse)
@@ -317,8 +317,8 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
 
 		metadataLeader1 := new(MetadataResponse)
 		metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-		metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
-		metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, ErrNoError)
+		metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
+		metadataLeader1.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, nil, ErrNoError)
 		seedBroker.Returns(metadataLeader1)
 
 		config := NewConfig()
@@ -344,8 +344,8 @@ func TestAsyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage), Partition: 0}
 		metadataLeader2 := new(MetadataResponse)
 		metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-		metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
-		metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, ErrNoError)
+		metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
+		metadataLeader2.AddTopicPartition("my_topic", 1, leader2.BrokerID(), nil, nil, nil, ErrNoError)
 		leader1.Returns(metadataLeader2)
 		leader1.Returns(metadataLeader2)
 
@@ -376,7 +376,7 @@ func TestAsyncProducerEncoderFailures(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
@@ -416,7 +416,7 @@ func TestAsyncProducerBrokerBounce(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
@@ -456,7 +456,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
 	config := NewConfig()
@@ -479,7 +479,7 @@ func TestAsyncProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	// ok fine, tell it to go to leader2 finally
 	metadataLeader2 := new(MetadataResponse)
 	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader2)
 
 	prodSuccess := new(ProduceResponse)
@@ -499,7 +499,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
 	config := NewConfig()
@@ -521,7 +521,7 @@ func TestAsyncProducerMultipleRetries(t *testing.T) {
 
 	metadataLeader2 := new(MetadataResponse)
 	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
 
 	seedBroker.Returns(metadataLeader2)
 	leader2.Returns(prodNotLeader)
@@ -555,7 +555,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
 	config := NewConfig()
@@ -582,7 +582,7 @@ func TestAsyncProducerMultipleRetriesWithBackoffFunc(t *testing.T) {
 
 	metadataLeader2 := new(MetadataResponse)
 	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
 
 	leader1.Returns(prodNotLeader)
 	seedBroker.Returns(metadataLeader2)
@@ -623,7 +623,7 @@ func TestAsyncProducerOutOfRetries(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	config := NewConfig()
@@ -679,8 +679,8 @@ func TestAsyncProducerRetryWithReferenceOpen(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	config := NewConfig()
@@ -735,8 +735,8 @@ func TestAsyncProducerFlusherRetryCondition(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	config := NewConfig()
@@ -801,7 +801,7 @@ func TestAsyncProducerRetryShutdown(t *testing.T) {
 
 	metadataLeader := new(MetadataResponse)
 	metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader)
 
 	config := NewConfig()
@@ -850,7 +850,7 @@ func TestAsyncProducerNoReturns(t *testing.T) {
 
 	metadataLeader := new(MetadataResponse)
 	metadataLeader.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader)
 
 	config := NewConfig()
@@ -892,7 +892,7 @@ func TestAsyncProducerIdempotentGoldenPath(t *testing.T) {
 		ControllerID: 1,
 	}
 	metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
 	broker.Returns(metadataResponse)
 
 	initProducerID := &InitProducerIDResponse{
@@ -950,7 +950,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
 			ControllerID: 1,
 		}
 		metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
-		metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
+		metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
 
 		initProducerIDResponse := &InitProducerIDResponse{
 			ThrottleTime:  0,
@@ -1092,7 +1092,7 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) {
 		ControllerID: 1,
 	}
 	metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
 	broker.Returns(metadataResponse)
 
 	initProducerID := &InitProducerIDResponse{

+ 32 - 2
client.go

@@ -46,6 +46,10 @@ type Client interface {
 	// the partition leader.
 	InSyncReplicas(topic string, partitionID int32) ([]int32, error)
 
+	// OfflineReplicas returns the set of all offline replica IDs for the given
+	// partition. Offline replicas are replicas which are offline
+	OfflineReplicas(topic string, partitionID int32) ([]int32, error)
+
 	// RefreshMetadata takes a list of topics and queries the cluster to refresh the
 	// available metadata for those topics. If no topics are provided, it will refresh
 	// metadata for all topics.
@@ -373,6 +377,31 @@ func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32,
 	return dupInt32Slice(metadata.Isr), nil
 }
 
+func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) {
+	if client.Closed() {
+		return nil, ErrClosedClient
+	}
+
+	metadata := client.cachedMetadata(topic, partitionID)
+
+	if metadata == nil {
+		err := client.RefreshMetadata(topic)
+		if err != nil {
+			return nil, err
+		}
+		metadata = client.cachedMetadata(topic, partitionID)
+	}
+
+	if metadata == nil {
+		return nil, ErrUnknownTopicOrPartition
+	}
+
+	if metadata.Err == ErrReplicaNotAvailable {
+		return dupInt32Slice(metadata.OfflineReplicas), metadata.Err
+	}
+	return dupInt32Slice(metadata.OfflineReplicas), nil
+}
+
 func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
 	if client.Closed() {
 		return nil, ErrClosedClient
@@ -728,11 +757,12 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
 		}
 
 		req := &MetadataRequest{Topics: topics}
-		if client.conf.Version.IsAtLeast(V0_10_0_0) {
+		if client.conf.Version.IsAtLeast(V1_0_0_0) {
+			req.Version = 5
+		} else if client.conf.Version.IsAtLeast(V0_10_0_0) {
 			req.Version = 1
 		}
 		response, err := broker.GetMetadata(req)
-
 		switch err.(type) {
 		case nil:
 			allKnownMetaData := len(topics) == 0

+ 101 - 13
client_test.go

@@ -37,8 +37,8 @@ func TestCachedPartitions(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker("localhost:12345", 2)
-	metadataResponse.AddTopicPartition("my_topic", 0, 2, replicas, isr, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, ErrLeaderNotAvailable)
+	metadataResponse.AddTopicPartition("my_topic", 0, 2, replicas, isr, []int32{}, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, []int32{}, ErrLeaderNotAvailable)
 	seedBroker.Returns(metadataResponse)
 
 	config := NewConfig()
@@ -75,8 +75,8 @@ func TestClientDoesntCachePartitionsForTopicsWithErrors(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(seedBroker.Addr(), seedBroker.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, replicas[0], replicas, replicas, []int32{}, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 2, replicas[0], replicas, replicas, []int32{}, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	config := NewConfig()
@@ -147,8 +147,8 @@ func TestClientMetadata(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, ErrNoError)
-	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrLeaderNotAvailable)
 	seedBroker.Returns(metadataResponse)
 
 	config := NewConfig()
@@ -213,13 +213,101 @@ func TestClientMetadata(t *testing.T) {
 	safeClose(t, client)
 }
 
+func TestClientMetadataWithOfflineReplicas(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 5)
+
+	replicas := []int32{1, 2, 3}
+	isr := []int32{1, 2}
+	offlineReplicas := []int32{3}
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, offlineReplicas, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, []int32{}, ErrNoError)
+	metadataResponse.Version = 5
+
+	seedBroker.Returns(metadataResponse)
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	config.Metadata.Retry.Max = 0
+	client, err := NewClient([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	topics, err := client.Topics()
+	if err != nil {
+		t.Error(err)
+	} else if len(topics) != 1 || topics[0] != "my_topic" {
+		t.Error("Client returned incorrect topics:", topics)
+	}
+
+	parts, err := client.Partitions("my_topic")
+	if err != nil {
+		t.Error(err)
+	} else if len(parts) != 2 || parts[0] != 0 || parts[1] != 1 {
+		t.Error("Client returned incorrect partitions for my_topic:", parts)
+	}
+
+	parts, err = client.WritablePartitions("my_topic")
+	if err != nil {
+		t.Error(err)
+	} else if len(parts) != 2 {
+		t.Error("Client returned incorrect writable partitions for my_topic:", parts)
+	}
+
+	tst, err := client.Leader("my_topic", 0)
+	if err != nil {
+		t.Error(err)
+	} else if tst.ID() != 5 {
+		t.Error("Leader for my_topic had incorrect ID.")
+	}
+
+	replicas, err = client.Replicas("my_topic", 0)
+	if err != nil {
+		t.Error(err)
+	} else if replicas[0] != 1 {
+		t.Error("Incorrect (or sorted) replica")
+	} else if replicas[1] != 2 {
+		t.Error("Incorrect (or sorted) replica")
+	} else if replicas[2] != 3 {
+		t.Error("Incorrect (or sorted) replica")
+	}
+
+	isr, err = client.InSyncReplicas("my_topic", 0)
+	if err != nil {
+		t.Error(err)
+	} else if len(isr) != 2 {
+		t.Error("Client returned incorrect ISRs for partition:", isr)
+	} else if isr[0] != 1 {
+		t.Error("Incorrect (or sorted) ISR:", isr)
+	} else if isr[1] != 2 {
+		t.Error("Incorrect (or sorted) ISR:", isr)
+	}
+
+	offlineReplicas, err = client.OfflineReplicas("my_topic", 0)
+	if err != nil {
+		t.Error(err)
+	} else if len(offlineReplicas) != 1 {
+		t.Error("Client returned incorrect offline replicas for partition:", offlineReplicas)
+	} else if offlineReplicas[0] != 3 {
+		t.Error("Incorrect offline replica:", offlineReplicas)
+	}
+
+	leader.Close()
+	seedBroker.Close()
+	safeClose(t, client)
+}
+
 func TestClientGetOffset(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	leader := NewMockBroker(t, 2)
 	leaderAddr := leader.Addr()
 
 	metadata := new(MetadataResponse)
-	metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadata.AddTopicPartition("foo", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	metadata.AddBroker(leaderAddr, leader.BrokerID())
 	seedBroker.Returns(metadata)
 
@@ -353,8 +441,8 @@ func TestClientReceivingPartialMetadata(t *testing.T) {
 
 	metadataPartial := new(MetadataResponse)
 	metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable)
-	metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, ErrNoError)
-	metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, ErrLeaderNotAvailable)
+	metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, []int32{}, ErrNoError)
+	metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, []int32{}, ErrLeaderNotAvailable)
 	seedBroker.Returns(metadataPartial)
 
 	if err := client.RefreshMetadata("new_topic"); err != nil {
@@ -396,7 +484,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	seedBroker.Returns(metadataResponse1)
 
 	metadataResponse2 := new(MetadataResponse)
-	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse2)
 
 	client, err := NewClient([]string{seedBroker.Addr()}, nil)
@@ -533,7 +621,7 @@ func TestClientCoordinatorWithConsumerOffsetsTopic(t *testing.T) {
 	metadataResponse1 := new(MetadataResponse)
 	metadataResponse1.AddBroker(staleCoordinator.Addr(), staleCoordinator.BrokerID())
 	metadataResponse1.AddBroker(freshCoordinator.Addr(), freshCoordinator.BrokerID())
-	metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError)
+	metadataResponse1.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
 	seedBroker.Returns(metadataResponse1)
 
 	client, err := NewClient([]string{seedBroker.Addr()}, nil)
@@ -628,7 +716,7 @@ func TestClientCoordinatorWithoutConsumerOffsetsTopic(t *testing.T) {
 
 	replicas := []int32{coordinator.BrokerID()}
 	metadataResponse3 := new(MetadataResponse)
-	metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, ErrNoError)
+	metadataResponse3.AddTopicPartition("__consumer_offsets", 0, replicas[0], replicas, replicas, []int32{}, ErrNoError)
 	seedBroker.Returns(metadataResponse3)
 
 	coordinatorResponse2 := new(ConsumerMetadataResponse)
@@ -687,7 +775,7 @@ func TestClientAutorefreshShutdownRace(t *testing.T) {
 	// Then return some metadata to the still-running background thread
 	leader := NewMockBroker(t, 2)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, ErrNoError)
+	metadataResponse.AddTopicPartition("foo", 0, leader.BrokerID(), []int32{2}, []int32{2}, []int32{}, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	<-done

+ 3 - 2
metadata_request.go

@@ -10,7 +10,7 @@ func (r *MetadataRequest) encode(pe packetEncoder) error {
 	if r.Version < 0 || r.Version > 5 {
 		return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
 	}
-	if r.Version == 0 || len(r.Topics) > 0 {
+	if r.Version == 0 || r.Version == 5 || len(r.Topics) > 0 {
 		err := pe.putArrayLength(len(r.Topics))
 		if err != nil {
 			return err
@@ -42,7 +42,7 @@ func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
 	} else {
 		topicCount := size
 		if topicCount == 0 {
-			return nil
+			goto SKIP
 		}
 
 		r.Topics = make([]string, topicCount)
@@ -54,6 +54,7 @@ func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
 			r.Topics[i] = topic
 		}
 	}
+SKIP:
 	if r.Version > 3 {
 		autoCreation, err := pd.getBool()
 		if err != nil {

+ 2 - 1
metadata_response.go

@@ -296,7 +296,7 @@ foundTopic:
 	return tmatch
 }
 
-func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
+func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) {
 	tmatch := r.AddTopic(topic, ErrNoError)
 	var pmatch *PartitionMetadata
 
@@ -316,6 +316,7 @@ foundPartition:
 	pmatch.Leader = brokerID
 	pmatch.Replicas = replicas
 	pmatch.Isr = isr
+	pmatch.OfflineReplicas = offline
 	pmatch.Err = err
 
 }

+ 3 - 3
mockresponses.go

@@ -178,7 +178,7 @@ func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
 
 	// Generate set of replicas
 	replicas := []int32{}
-
+	offlineReplicas := []int32{}
 	for _, brokerID := range mmr.brokers {
 		replicas = append(replicas, brokerID)
 	}
@@ -186,14 +186,14 @@ func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
 	if len(metadataRequest.Topics) == 0 {
 		for topic, partitions := range mmr.leaders {
 			for partition, brokerID := range partitions {
-				metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
+				metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
 			}
 		}
 		return metadataResponse
 	}
 	for _, topic := range metadataRequest.Topics {
 		for partition, brokerID := range mmr.leaders[topic] {
-			metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
+			metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
 		}
 	}
 	return metadataResponse

+ 2 - 2
offset_manager_test.go

@@ -26,8 +26,8 @@ func initOffsetManagerWithBackoffFunc(t *testing.T, retention time.Duration,
 
 	seedMeta := new(MetadataResponse)
 	seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
-	seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
-	seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
+	seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, []int32{}, ErrNoError)
+	seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, []int32{}, ErrNoError)
 	broker.Returns(seedMeta)
 
 	var err error

+ 6 - 6
sync_producer_test.go

@@ -12,7 +12,7 @@ func TestSyncProducer(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
@@ -60,7 +60,7 @@ func TestSyncProducerBatch(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
@@ -108,7 +108,7 @@ func TestConcurrentSyncProducer(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataResponse)
 
 	prodSuccess := new(ProduceResponse)
@@ -151,7 +151,7 @@ func TestSyncProducerToNonExistingTopic(t *testing.T) {
 
 	metadataResponse := new(MetadataResponse)
 	metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
-	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
+	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, nil, ErrNoError)
 	broker.Returns(metadataResponse)
 
 	config := NewConfig()
@@ -184,7 +184,7 @@ func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
 
 	metadataLeader1 := new(MetadataResponse)
 	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
-	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, nil, ErrNoError)
 	seedBroker.Returns(metadataLeader1)
 
 	config := NewConfig()
@@ -207,7 +207,7 @@ func TestSyncProducerRecoveryWithRetriesDisabled(t *testing.T) {
 
 	metadataLeader2 := new(MetadataResponse)
 	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
-	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, ErrNoError)
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, nil, ErrNoError)
 	leader1.Returns(metadataLeader2)
 	prodSuccess := new(ProduceResponse)
 	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)