فهرست منبع

Merge branch 'master' into master

Justfly 8 سال پیش
والد
کامیت
8a87ca8bca
5فایلهای تغییر یافته به همراه21 افزوده شده و 22 حذف شده
  1. 1 1
      .travis.yml
  2. 1 1
      README.md
  3. 11 0
      broker.go
  4. 7 2
      client.go
  5. 1 18
      message_test.go

+ 1 - 1
.travis.yml

@@ -2,6 +2,7 @@ language: go
 go:
 go:
 - 1.8.x
 - 1.8.x
 - 1.9.x
 - 1.9.x
+- 1.10.x
 
 
 env:
 env:
   global:
   global:
@@ -11,7 +12,6 @@ env:
   - KAFKA_HOSTNAME=localhost
   - KAFKA_HOSTNAME=localhost
   - DEBUG=true
   - DEBUG=true
   matrix:
   matrix:
-  - KAFKA_VERSION=0.10.2.1
   - KAFKA_VERSION=0.11.0.2
   - KAFKA_VERSION=0.11.0.2
   - KAFKA_VERSION=1.0.0
   - KAFKA_VERSION=1.0.0
 
 

+ 1 - 1
README.md

@@ -21,7 +21,7 @@ You might also want to look at the [Frequently Asked Questions](https://github.c
 Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
 Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
 the two latest stable releases of Kafka and Go, and we provide a two month
 the two latest stable releases of Kafka and Go, and we provide a two month
 grace period for older releases. This means we currently officially support
 grace period for older releases. This means we currently officially support
-Go 1.9 and 1.8, and Kafka 1.0 through 0.10, although older releases are
+Go 1.8 through 1.10, and Kafka 0.11 through 1.0, although older releases are
 still likely to work.
 still likely to work.
 
 
 Sarama follows semantic versioning and provides API stability via the gopkg.in service.
 Sarama follows semantic versioning and provides API stability via the gopkg.in service.

+ 11 - 0
broker.go

@@ -386,6 +386,17 @@ func (b *Broker) ApiVersions(request *ApiVersionsRequest) (*ApiVersionsResponse,
 	return response, nil
 	return response, nil
 }
 }
 
 
+func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
+	response := new(CreatePartitionsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
 func (b *Broker) CreateTopics(request *CreateTopicsRequest) (*CreateTopicsResponse, error) {
 	response := new(CreateTopicsResponse)
 	response := new(CreateTopicsResponse)
 
 

+ 7 - 2
client.go

@@ -686,8 +686,9 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
 
 
 		switch err.(type) {
 		switch err.(type) {
 		case nil:
 		case nil:
+			allKnownMetaData := len(topics) == 0
 			// valid response, use it
 			// valid response, use it
-			shouldRetry, err := client.updateMetadata(response)
+			shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
 			if shouldRetry {
 			if shouldRetry {
 				Logger.Println("client/metadata found some partitions to be leaderless")
 				Logger.Println("client/metadata found some partitions to be leaderless")
 				return retry(err) // note: err can be nil
 				return retry(err) // note: err can be nil
@@ -711,7 +712,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
 }
 }
 
 
 // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
 // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
-func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err error) {
+func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
 	client.lock.Lock()
 	client.lock.Lock()
 	defer client.lock.Unlock()
 	defer client.lock.Unlock()
 
 
@@ -725,6 +726,10 @@ func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err er
 
 
 	client.controllerID = data.ControllerID
 	client.controllerID = data.ControllerID
 
 
+	if allKnownMetaData {
+		client.metadata = make(map[string]map[int32]*PartitionMetadata)
+		client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
+	}
 	for _, topic := range data.Topics {
 	for _, topic := range data.Topics {
 		delete(client.metadata, topic.Name)
 		delete(client.metadata, topic.Name)
 		delete(client.cachedPartitionsResults, topic.Name)
 		delete(client.cachedPartitionsResults, topic.Name)

+ 1 - 18
message_test.go

@@ -1,8 +1,6 @@
 package sarama
 package sarama
 
 
 import (
 import (
-	"runtime"
-	"strings"
 	"testing"
 	"testing"
 	"time"
 	"time"
 )
 )
@@ -31,17 +29,6 @@ var (
 		0xFF, 0xFF, 0xFF, 0xFF} // value
 		0xFF, 0xFF, 0xFF, 0xFF} // value
 
 
 	emptyGzipMessage = []byte{
 	emptyGzipMessage = []byte{
-		97, 79, 149, 90, //CRC
-		0x00,                   // magic version byte
-		0x01,                   // attribute flags
-		0xFF, 0xFF, 0xFF, 0xFF, // key
-		// value
-		0x00, 0x00, 0x00, 0x17,
-		0x1f, 0x8b,
-		0x08,
-		0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
-
-	emptyGzipMessage18 = []byte{
 		132, 99, 80, 148, //CRC
 		132, 99, 80, 148, //CRC
 		0x00,                   // magic version byte
 		0x00,                   // magic version byte
 		0x01,                   // attribute flags
 		0x01,                   // attribute flags
@@ -107,11 +94,7 @@ func TestMessageEncoding(t *testing.T) {
 
 
 	message.Value = []byte{}
 	message.Value = []byte{}
 	message.Codec = CompressionGZIP
 	message.Codec = CompressionGZIP
-	if strings.HasPrefix(runtime.Version(), "go1.8") || strings.HasPrefix(runtime.Version(), "go1.9") {
-		testEncodable(t, "empty gzip", &message, emptyGzipMessage18)
-	} else {
-		testEncodable(t, "empty gzip", &message, emptyGzipMessage)
-	}
+	testEncodable(t, "empty gzip", &message, emptyGzipMessage)
 
 
 	message.Value = []byte{}
 	message.Value = []byte{}
 	message.Codec = CompressionLZ4
 	message.Codec = CompressionLZ4