瀏覽代碼

Adapt for the latest offset manager implementation

Dimitrij Denissenko 7 年之前
父節點
當前提交
6fd7e304e7
共有 3 個文件被更改,包括 55 次插入57 次删除
  1. 24 44
      consumer_group.go
  2. 6 2
      functional_consumer_group_test.go
  3. 25 11
      offset_manager.go

+ 24 - 44
consumer_group.go

@@ -463,24 +463,23 @@ type consumerGroupSession struct {
 	generationID int32
 	handler      ConsumerGroupHandler
 
-	offsets OffsetManager
-	claims  map[string][]int32
-	poms    map[string]map[int32]PartitionOffsetManager
-	done    chan none
-
+	claims    map[string][]int32
+	offsets   *offsetManager
+	done      chan none
 	waitGroup sync.WaitGroup
 
 	cancelOnce, releaseOnce sync.Once
 }
 
 func newConsumerGroupSession(parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
-	// create a new offset manager for the session
-	offsets, err := NewOffsetManagerFromClient(parent.groupID, parent.client)
+	// init offset manager
+	om, err := NewOffsetManagerFromClient(parent.groupID, parent.client)
 	if err != nil {
 		return nil, err
 	}
-	offsets.(*offsetManager).memberID = memberID
-	offsets.(*offsetManager).generation = generationID
+	offsets := om.(*offsetManager)
+	offsets.memberID = memberID
+	offsets.generation = generationID
 
 	// init session
 	sess := &consumerGroupSession{
@@ -488,21 +487,17 @@ func newConsumerGroupSession(parent *consumerGroup, claims map[string][]int32, m
 		memberID:     memberID,
 		generationID: generationID,
 		handler:      handler,
-
-		offsets: offsets,
-		claims:  claims,
-		poms:    make(map[string]map[int32]PartitionOffsetManager, len(claims)),
-		done:    make(chan none),
+		offsets:      offsets,
+		claims:       claims,
+		done:         make(chan none),
 	}
 
 	// start heartbeat loop
 	sess.waitGroup.Add(1)
 	go sess.heartbeatLoop()
 
-	// for each claim, create a PartitionOffsetManager
+	// create a POM for each claim
 	for topic, partitions := range claims {
-		sess.poms[topic] = make(map[int32]PartitionOffsetManager, len(partitions))
-
 		for _, partition := range partitions {
 			pom, err := offsets.ManagePartition(topic, partition)
 			if err != nil {
@@ -510,13 +505,12 @@ func newConsumerGroupSession(parent *consumerGroup, claims map[string][]int32, m
 				return nil, err
 			}
 
+			// handle POM errors
 			go func(topic string, partition int32) {
 				for err := range pom.Errors() {
 					sess.parent.handleError(err, topic, partition)
 				}
 			}(topic, partition)
-
-			sess.poms[topic][partition] = pom
 		}
 	}
 
@@ -551,18 +545,14 @@ func (s *consumerGroupSession) MemberID() string           { return s.memberID }
 func (s *consumerGroupSession) GenerationID() int32        { return s.generationID }
 
 func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
-	if partitions, ok := s.poms[topic]; ok {
-		if pom, ok := partitions[partition]; ok {
-			pom.MarkOffset(offset, metadata)
-		}
+	if pom := s.offsets.findPOM(topic, partition); pom != nil {
+		pom.MarkOffset(offset, metadata)
 	}
 }
 
 func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
-	if partitions, ok := s.poms[topic]; ok {
-		if pom, ok := partitions[partition]; ok {
-			pom.ResetOffset(offset, metadata)
-		}
+	if pom := s.offsets.findPOM(topic, partition); pom != nil {
+		pom.ResetOffset(offset, metadata)
 	}
 }
 
@@ -579,7 +569,10 @@ func (s *consumerGroupSession) consume(topic string, partition int32) {
 	}
 
 	// get next offset
-	offset, _ := s.poms[topic][partition].NextOffset()
+	offset := s.parent.config.Consumer.Offsets.Initial
+	if pom := s.offsets.findPOM(topic, partition); pom != nil {
+		offset, _ = pom.NextOffset()
+	}
 
 	// create new claim
 	claim, err := newConsumerGroupClaim(s, topic, partition, offset)
@@ -620,35 +613,22 @@ func (s *consumerGroupSession) Cancel() {
 	})
 }
 
-func (s *consumerGroupSession) release(cleanup bool) (err error) {
+func (s *consumerGroupSession) release(withCleanup bool) (err error) {
 	// signal release, stop heartbeat
 	s.Cancel()
 
 	// wait for consumers to exit
 	s.waitGroup.Wait()
 
-	// perform release steps
+	// perform release
 	s.releaseOnce.Do(func() {
-
-		// close partition offset managers
-		for topic, partitions := range s.poms {
-			for partition, pom := range partitions {
-				if e := pom.Close(); e != nil {
-					s.parent.handleError(err, topic, partition)
-					err = e
-				}
-			}
-		}
-
-		// perform cleanup if this is a clean release
-		if cleanup {
+		if withCleanup {
 			if e := s.handler.Cleanup(s); e != nil {
 				s.parent.handleError(err, "", -1)
 				err = e
 			}
 		}
 
-		// close offset manager
 		if e := s.offsets.Close(); e != nil {
 			err = e
 		}

+ 6 - 2
functional_consumer_group_test.go

@@ -158,11 +158,15 @@ func testFuncConsumerGroupFuzzySeed(topic string) error {
 
 	total := int64(0)
 	for pn := int32(0); pn < 4; pn++ {
-		offset, err := client.GetOffset(topic, pn, OffsetNewest)
+		newest, err := client.GetOffset(topic, pn, OffsetNewest)
 		if err != nil {
 			return err
 		}
-		total += offset
+		oldest, err := client.GetOffset(topic, pn, OffsetOldest)
+		if err != nil {
+			return err
+		}
+		total = total + newest - oldest
 	}
 	if total >= 21000 {
 		return nil

+ 25 - 11
offset_manager.go

@@ -34,7 +34,7 @@ type offsetManager struct {
 	brokerLock sync.RWMutex
 
 	poms     map[string]map[int32]*partitionOffsetManager
-	pomsLock sync.Mutex
+	pomsLock sync.RWMutex
 
 	closeOnce sync.Once
 	closing   chan none
@@ -250,20 +250,22 @@ func (om *offsetManager) constructRequest() *OffsetCommitRequest {
 		r = &OffsetCommitRequest{
 			Version:                 1,
 			ConsumerGroup:           om.group,
-			ConsumerGroupGeneration: bom.parent.generation,
+			ConsumerID:              om.memberID,
+			ConsumerGroupGeneration: om.generation,
 		}
 	} else {
 		r = &OffsetCommitRequest{
 			Version:                 2,
 			RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
+			ConsumerID:              om.memberID,
 			ConsumerGroup:           om.group,
-			ConsumerGroupGeneration: bom.parent.generation,
+			ConsumerGroupGeneration: om.generation,
 		}
 
 	}
 
-	om.pomsLock.Lock()
-	defer om.pomsLock.Unlock()
+	om.pomsLock.RLock()
+	defer om.pomsLock.RUnlock()
 
 	for _, topicManagers := range om.poms {
 		for _, pom := range topicManagers {
@@ -283,8 +285,8 @@ func (om *offsetManager) constructRequest() *OffsetCommitRequest {
 }
 
 func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) {
-	om.pomsLock.Lock()
-	defer om.pomsLock.Unlock()
+	om.pomsLock.RLock()
+	defer om.pomsLock.RUnlock()
 
 	for _, topicManagers := range om.poms {
 		for _, pom := range topicManagers {
@@ -334,8 +336,8 @@ func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest
 }
 
 func (om *offsetManager) handleError(err error) {
-	om.pomsLock.Lock()
-	defer om.pomsLock.Unlock()
+	om.pomsLock.RLock()
+	defer om.pomsLock.RUnlock()
 
 	for _, topicManagers := range om.poms {
 		for _, pom := range topicManagers {
@@ -345,8 +347,8 @@ func (om *offsetManager) handleError(err error) {
 }
 
 func (om *offsetManager) asyncClosePOMs() {
-	om.pomsLock.Lock()
-	defer om.pomsLock.Unlock()
+	om.pomsLock.RLock()
+	defer om.pomsLock.RUnlock()
 
 	for _, topicManagers := range om.poms {
 		for _, pom := range topicManagers {
@@ -380,6 +382,18 @@ func (om *offsetManager) releasePOMs(force bool) (remaining int) {
 	return
 }
 
+func (om *offsetManager) findPOM(topic string, partition int32) *partitionOffsetManager {
+	om.pomsLock.RLock()
+	defer om.pomsLock.RUnlock()
+
+	if partitions, ok := om.poms[topic]; ok {
+		if pom, ok := partitions[partition]; ok {
+			return pom
+		}
+	}
+	return nil
+}
+
 // Partition Offset Manager
 
 // PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()