Sfoglia il codice sorgente

Merge branch 'master' of https://github.com/Shopify/sarama

AJ Yoo 7 anni fa
parent
commit
4824b74e44

+ 4 - 5
.travis.yml

@@ -1,6 +1,5 @@
 language: go
 go:
-- 1.9.x
 - 1.10.x
 - 1.11.x
 
@@ -12,9 +11,9 @@ env:
   - KAFKA_HOSTNAME=localhost
   - DEBUG=true
   matrix:
-  - KAFKA_VERSION=1.0.0
-  - KAFKA_VERSION=1.1.0
-  - KAFKA_VERSION=2.0.0
+  - KAFKA_VERSION=1.1.1
+  - KAFKA_VERSION=2.0.1
+  - KAFKA_VERSION=2.1.0
 
 before_install:
 - export REPOSITORY_ROOT=${TRAVIS_BUILD_DIR}
@@ -28,7 +27,7 @@ script:
 - make test
 - make vet
 - make errcheck
-- if [ "$TRAVIS_GO_VERSION" = "1.11" ]; then make fmt; fi
+- if [[ "$TRAVIS_GO_VERSION" == 1.11* ]]; then make fmt; fi
 
 after_success:
 - bash <(curl -s https://codecov.io/bash)

+ 5 - 5
consumer_group.go

@@ -33,7 +33,7 @@ type ConsumerGroup interface {
 	//    to allow the user to perform any final tasks before a rebalance.
 	// 6. Finally, marked offsets are committed one last time before claims are released.
 	//
-	// Please note, that once a relance is triggered, sessions must be completed within
+	// Please note, that once a rebalance is triggered, sessions must be completed within
 	// Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
 	// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
 	// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
@@ -267,7 +267,7 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
 		}
 	}
 
-	return newConsumerGroupSession(c, ctx, claims, join.MemberId, join.GenerationId, handler)
+	return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
 }
 
 func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
@@ -456,7 +456,7 @@ type consumerGroupSession struct {
 	hbDying, hbDead chan none
 }
 
-func newConsumerGroupSession(parent *consumerGroup, ctx context.Context, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
+func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
 	// init offset manager
 	offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
 	if err != nil {
@@ -595,7 +595,7 @@ func (s *consumerGroupSession) consume(topic string, partition int32) {
 		s.parent.handleError(err, topic, partition)
 	}
 
-	// ensure consumer is clased & drained
+	// ensure consumer is closed & drained
 	claim.AsyncClose()
 	for _, err := range claim.waitClosed() {
 		s.parent.handleError(err, topic, partition)
@@ -691,7 +691,7 @@ type ConsumerGroupHandler interface {
 	// Setup is run at the beginning of a new session, before ConsumeClaim.
 	Setup(ConsumerGroupSession) error
 
-	// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exites
+	// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
 	// but before the offsets are committed for the very last time.
 	Cleanup(ConsumerGroupSession) error
 

+ 2 - 0
consumer_group_members.go

@@ -1,5 +1,6 @@
 package sarama
 
+//ConsumerGroupMemberMetadata holds the metadata for consumer group
 type ConsumerGroupMemberMetadata struct {
 	Version  int16
 	Topics   []string
@@ -36,6 +37,7 @@ func (m *ConsumerGroupMemberMetadata) decode(pd packetDecoder) (err error) {
 	return nil
 }
 
+//ConsumerGroupMemberAssignment holds the member assignment for a consume group
 type ConsumerGroupMemberAssignment struct {
 	Version  int16
 	Topics   map[string][]int32

+ 1 - 0
consumer_metadata_request.go

@@ -1,5 +1,6 @@
 package sarama
 
+//ConsumerMetadataRequest is used for metadata requests
 type ConsumerMetadataRequest struct {
 	ConsumerGroup string
 }

+ 1 - 0
consumer_metadata_response.go

@@ -5,6 +5,7 @@ import (
 	"strconv"
 )
 
+//ConsumerMetadataResponse holds the reponse for a consumer gorup meta data request
 type ConsumerMetadataResponse struct {
 	Err             KError
 	Coordinator     *Broker

+ 9 - 3
utils.go

@@ -155,7 +155,10 @@ var (
 	V0_11_0_2 = newKafkaVersion(0, 11, 0, 2)
 	V1_0_0_0  = newKafkaVersion(1, 0, 0, 0)
 	V1_1_0_0  = newKafkaVersion(1, 1, 0, 0)
+	V1_1_1_0  = newKafkaVersion(1, 1, 1, 0)
 	V2_0_0_0  = newKafkaVersion(2, 0, 0, 0)
+	V2_0_1_0  = newKafkaVersion(2, 0, 1, 0)
+	V2_1_0_0  = newKafkaVersion(2, 1, 0, 0)
 
 	SupportedVersions = []KafkaVersion{
 		V0_8_2_0,
@@ -174,10 +177,13 @@ var (
 		V0_11_0_2,
 		V1_0_0_0,
 		V1_1_0_0,
+		V1_1_1_0,
 		V2_0_0_0,
+		V2_0_1_0,
+		V2_1_0_0,
 	}
 	MinVersion = V0_8_2_0
-	MaxVersion = V2_0_0_0
+	MaxVersion = V2_1_0_0
 )
 
 func ParseKafkaVersion(s string) (KafkaVersion, error) {
@@ -208,7 +214,7 @@ func scanKafkaVersion(s string, pattern string, format string, v [3]*uint) error
 func (v KafkaVersion) String() string {
 	if v.version[0] == 0 {
 		return fmt.Sprintf("0.%d.%d.%d", v.version[1], v.version[2], v.version[3])
-	} else {
-		return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2])
 	}
+
+	return fmt.Sprintf("%d.%d.%d", v.version[0], v.version[1], v.version[2])
 }

+ 2 - 2
vagrant/install_cluster.sh

@@ -2,11 +2,11 @@
 
 set -ex
 
-TOXIPROXY_VERSION=2.0.0
+TOXIPROXY_VERSION=2.1.3
 
 mkdir -p ${KAFKA_INSTALL_ROOT}
 if [ ! -f ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz ]; then
-    wget --quiet http://apache.mirror.gtcomm.net/kafka/${KAFKA_VERSION}/kafka_2.11-${KAFKA_VERSION}.tgz -O ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz
+    wget --quiet https://www-us.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_2.11-${KAFKA_VERSION}.tgz -O ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz
 fi
 if [ ! -f ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ]; then
     wget --quiet https://github.com/Shopify/toxiproxy/releases/download/v${TOXIPROXY_VERSION}/toxiproxy-server-linux-amd64 -O ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION}