Dimitrij Denissenko 7 年之前
父节点
当前提交
fde71cc31f
共有 2 个文件被更改,包括 8 次插入6 次删除
  1. 1 4
      consumer_group.go
  2. 7 2
      offset_manager.go

+ 1 - 4
consumer_group.go

@@ -472,13 +472,10 @@ type consumerGroupSession struct {
 
 func newConsumerGroupSession(parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
 	// init offset manager
-	om, err := NewOffsetManagerFromClient(parent.groupID, parent.client)
+	offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
 	if err != nil {
 		return nil, err
 	}
-	offsets := om.(*offsetManager)
-	offsets.memberID = memberID
-	offsets.generation = generationID
 
 	// init session
 	sess := &consumerGroupSession{

+ 7 - 2
offset_manager.go

@@ -44,6 +44,10 @@ type offsetManager struct {
 // NewOffsetManagerFromClient creates a new OffsetManager from the given client.
 // It is still necessary to call Close() on the underlying client when finished with the partition manager.
 func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
+	return newOffsetManagerFromClient(group, "", GroupGenerationUndefined, client)
+}
+
+func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) {
 	// Check that we are not dealing with a closed Client before processing any other arguments
 	if client.Closed() {
 		return nil, ErrClosedClient
@@ -57,7 +61,8 @@ func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, err
 		ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
 		poms:   make(map[string]map[int32]*partitionOffsetManager),
 
-		generation: GroupGenerationUndefined,
+		memberID:   memberID,
+		generation: generation,
 
 		closing: make(chan none),
 		closed:  make(chan none),
@@ -257,8 +262,8 @@ func (om *offsetManager) constructRequest() *OffsetCommitRequest {
 		r = &OffsetCommitRequest{
 			Version:                 2,
 			RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
-			ConsumerID:              om.memberID,
 			ConsumerGroup:           om.group,
+			ConsumerID:              om.memberID,
 			ConsumerGroupGeneration: om.generation,
 		}