Dimitrij Denissenko 10 лет назад
Родитель
Сommit
914efc548f
3 измененных файлов с 25 добавлено и 15 удалено
  1. 14 0
      config.go
  2. 8 12
      group_members.go
  3. 3 3
      group_members_test.go

+ 14 - 0
config.go

@@ -276,6 +276,12 @@ func (c *Config) Validate() error {
 	if c.Consumer.MaxWaitTime%time.Millisecond != 0 {
 		Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
 	}
+	if c.Group.Heartbeat.Interval%time.Millisecond != 0 {
+		Logger.Println("Group.Heartbeat.Interval only supports millisecond precision; nanoseconds will be truncated.")
+	}
+	if c.Group.Session.Timeout%time.Millisecond != 0 {
+		Logger.Println("Group.Session.Timeout only supports millisecond precision; nanoseconds will be truncated.")
+	}
 	if c.ClientID == "sarama" {
 		Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.")
 	}
@@ -304,6 +310,14 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Metadata.RefreshFrequency must be >= 0")
 	}
 
+	// validate the Group values
+	switch {
+	case c.Group.Heartbeat.Interval <= 0:
+		return ConfigurationError("Group.Heartbeat.Interval must be > 0")
+	case c.Group.Session.Timeout <= 0:
+		return ConfigurationError("Group.Session.Timeout must be > 0")
+	}
+
 	// validate the Producer values
 	switch {
 	case c.Producer.MaxMessageBytes <= 0:

+ 8 - 12
group_members.go

@@ -38,15 +38,10 @@ func (m *GroupMemberMetadata) decode(pd packetDecoder) (err error) {
 
 type GroupMemberAssignment struct {
 	Version  int16
-	Topics   []GroupMemberAssignedTopic
+	Topics   map[string][]int32
 	UserData []byte
 }
 
-type GroupMemberAssignedTopic struct {
-	Topic      string
-	Partitions []int32
-}
-
 func (m *GroupMemberAssignment) encode(pe packetEncoder) error {
 	pe.putInt16(m.Version)
 
@@ -54,11 +49,11 @@ func (m *GroupMemberAssignment) encode(pe packetEncoder) error {
 		return err
 	}
 
-	for _, topic := range m.Topics {
-		if err := pe.putString(topic.Topic); err != nil {
+	for topic, partitions := range m.Topics {
+		if err := pe.putString(topic); err != nil {
 			return err
 		}
-		if err := pe.putInt32Array(topic.Partitions); err != nil {
+		if err := pe.putInt32Array(partitions); err != nil {
 			return err
 		}
 	}
@@ -80,12 +75,13 @@ func (m *GroupMemberAssignment) decode(pd packetDecoder) (err error) {
 		return
 	}
 
-	m.Topics = make([]GroupMemberAssignedTopic, topicLen)
+	m.Topics = make(map[string][]int32, topicLen)
 	for i := 0; i < topicLen; i++ {
-		if m.Topics[i].Topic, err = pd.getString(); err != nil {
+		var topic string
+		if topic, err = pd.getString(); err != nil {
 			return
 		}
-		if m.Topics[i].Partitions, err = pd.getInt32Array(); err != nil {
+		if m.Topics[topic], err = pd.getInt32Array(); err != nil {
 			return
 		}
 	}

+ 3 - 3
group_members_test.go

@@ -53,9 +53,9 @@ func TestGroupMemberMetadata(t *testing.T) {
 func TestGroupMemberAssignment(t *testing.T) {
 	amt := &GroupMemberAssignment{
 		Version: 1,
-		Topics: []GroupMemberAssignedTopic{
-			{Topic: "one", Partitions: []int32{0, 2, 4}},
-			{Topic: "two", Partitions: []int32{1, 3}},
+		Topics: map[string][]int32{
+			"one": []int32{0, 2, 4},
+			"two": []int32{1, 3},
 		},
 		UserData: []byte{0x01, 0x02, 0x03},
 	}