Browse Source

Addressing feeback

Dimitrij Denissenko 7 years ago
parent
commit
151daaac97
2 changed files with 95 additions and 91 deletions
  1. 1 0
      .gitignore
  2. 94 91
      offset_manager.go

+ 1 - 0
.gitignore

@@ -24,3 +24,4 @@ _testmain.go
 *.exe
 
 coverage.txt
+profile.out

+ 94 - 91
offset_manager.go

@@ -27,11 +27,11 @@ type offsetManager struct {
 	group  string
 	ticker *time.Ticker
 
-	broker   *Broker
-	brokerMu sync.RWMutex
+	broker     *Broker
+	brokerLock sync.RWMutex
 
-	poms   map[string]map[int32]*partitionOffsetManager
-	pomsMu sync.Mutex
+	poms     map[string]map[int32]*partitionOffsetManager
+	pomsLock sync.Mutex
 
 	closeOnce sync.Once
 	closing   chan none
@@ -68,8 +68,8 @@ func (om *offsetManager) ManagePartition(topic string, partition int32) (Partiti
 		return nil, err
 	}
 
-	om.pomsMu.Lock()
-	defer om.pomsMu.Unlock()
+	om.pomsLock.Lock()
+	defer om.pomsLock.Unlock()
 
 	topicManagers := om.poms[topic]
 	if topicManagers == nil {
@@ -96,7 +96,8 @@ func (om *offsetManager) Close() error {
 
 		// flush one last time
 		for retries := om.conf.Metadata.Retry.Max; true; {
-			if om.flushToBroker() {
+			om.flushToBroker()
+			if om.releasePOMs(false) == 0 {
 				break
 			}
 			if retries--; retries < 0 {
@@ -105,9 +106,9 @@ func (om *offsetManager) Close() error {
 		}
 
 		om.releasePOMs(true)
-		om.brokerMu.Lock()
+		om.brokerLock.Lock()
 		om.broker = nil
-		om.brokerMu.Unlock()
+		om.brokerLock.Unlock()
 	})
 	return nil
 }
@@ -121,12 +122,12 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri
 		return om.fetchInitialOffset(topic, partition, retries-1)
 	}
 
-	request := new(OffsetFetchRequest)
-	request.Version = 1
-	request.ConsumerGroup = om.group
-	request.AddPartition(topic, partition)
+	req := new(OffsetFetchRequest)
+	req.Version = 1
+	req.ConsumerGroup = om.group
+	req.AddPartition(topic, partition)
 
-	response, err := broker.FetchOffset(request)
+	resp, err := broker.FetchOffset(req)
 	if err != nil {
 		if retries <= 0 {
 			return 0, "", err
@@ -135,7 +136,7 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri
 		return om.fetchInitialOffset(topic, partition, retries-1)
 	}
 
-	block := response.GetBlock(topic, partition)
+	block := resp.GetBlock(topic, partition)
 	if block == nil {
 		return 0, "", ErrIncompleteResponse
 	}
@@ -165,16 +166,16 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri
 }
 
 func (om *offsetManager) coordinator() (*Broker, error) {
-	om.brokerMu.RLock()
+	om.brokerLock.RLock()
 	broker := om.broker
-	om.brokerMu.RUnlock()
+	om.brokerLock.RUnlock()
 
 	if broker != nil {
 		return broker, nil
 	}
 
-	om.brokerMu.Lock()
-	defer om.brokerMu.Unlock()
+	om.brokerLock.Lock()
+	defer om.brokerLock.Unlock()
 
 	if broker := om.broker; broker != nil {
 		return broker, nil
@@ -194,11 +195,11 @@ func (om *offsetManager) coordinator() (*Broker, error) {
 }
 
 func (om *offsetManager) releaseCoordinator(b *Broker) {
-	om.brokerMu.Lock()
+	om.brokerLock.Lock()
 	if om.broker == b {
 		om.broker = nil
 	}
-	om.brokerMu.Unlock()
+	om.brokerLock.Unlock()
 }
 
 func (om *offsetManager) mainLoop() {
@@ -216,52 +217,94 @@ func (om *offsetManager) mainLoop() {
 	}
 }
 
-func (om *offsetManager) flushToBroker() (success bool) {
-	request := om.constructRequest()
-	if request == nil {
-		return true
+func (om *offsetManager) flushToBroker() {
+	req := om.constructRequest()
+	if req == nil {
+		return
 	}
 
 	broker, err := om.coordinator()
 	if err != nil {
 		om.handleError(err)
-		return false
+		return
 	}
 
-	response, err := broker.CommitOffset(request)
+	resp, err := broker.CommitOffset(req)
 	if err != nil {
 		om.handleError(err)
 		om.releaseCoordinator(broker)
 		_ = broker.Close()
-		return false
+		return
 	}
 
-	success = true
+	om.handleResponse(broker, req, resp)
+}
 
-	om.pomsMu.Lock()
-	defer om.pomsMu.Unlock()
+func (om *offsetManager) constructRequest() *OffsetCommitRequest {
+	var r *OffsetCommitRequest
+	var perPartitionTimestamp int64
+	if om.conf.Consumer.Offsets.Retention == 0 {
+		perPartitionTimestamp = ReceiveTime
+		r = &OffsetCommitRequest{
+			Version:                 1,
+			ConsumerGroup:           om.group,
+			ConsumerGroupGeneration: GroupGenerationUndefined,
+		}
+	} else {
+		r = &OffsetCommitRequest{
+			Version:                 2,
+			RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
+			ConsumerGroup:           om.group,
+			ConsumerGroupGeneration: GroupGenerationUndefined,
+		}
+
+	}
+
+	om.pomsLock.Lock()
+	defer om.pomsLock.Unlock()
+
+	for _, topicManagers := range om.poms {
+		for _, pom := range topicManagers {
+			pom.lock.Lock()
+			if pom.dirty {
+				r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
+			}
+			pom.lock.Unlock()
+		}
+	}
+
+	if len(r.blocks) > 0 {
+		return r
+	}
+
+	return nil
+}
+
+func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) {
+	om.pomsLock.Lock()
+	defer om.pomsLock.Unlock()
 
 	for _, topicManagers := range om.poms {
 		for _, pom := range topicManagers {
-			if request.blocks[pom.topic] == nil || request.blocks[pom.topic][pom.partition] == nil {
+			if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil {
 				continue
 			}
 
 			var err KError
 			var ok bool
 
-			if response.Errors[pom.topic] == nil {
+			if resp.Errors[pom.topic] == nil {
 				pom.handleError(ErrIncompleteResponse)
 				continue
 			}
-			if err, ok = response.Errors[pom.topic][pom.partition]; !ok {
+			if err, ok = resp.Errors[pom.topic][pom.partition]; !ok {
 				pom.handleError(ErrIncompleteResponse)
 				continue
 			}
 
 			switch err {
 			case ErrNoError:
-				block := request.blocks[pom.topic][pom.partition]
+				block := req.blocks[pom.topic][pom.partition]
 				pom.updateCommitted(block.offset, block.metadata)
 			case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
 				ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
@@ -269,71 +312,28 @@ func (om *offsetManager) flushToBroker() (success bool) {
 				om.releaseCoordinator(broker)
 			case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
 				// nothing we can do about this, just tell the user and carry on
-				success = false
 				pom.handleError(err)
 			case ErrOffsetsLoadInProgress:
 				// nothing wrong but we didn't commit, we'll get it next time round
 				break
 			case ErrUnknownTopicOrPartition:
 				// let the user know *and* try redispatching - if topic-auto-create is
-				// enabled, redispatching should trigger a metadata request and create the
+				// enabled, redispatching should trigger a metadata req and create the
 				// topic; if not then re-dispatching won't help, but we've let the user
 				// know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
 				fallthrough
 			default:
 				// dunno, tell the user and try redispatching
-				success = false
 				pom.handleError(err)
 				om.releaseCoordinator(broker)
 			}
 		}
 	}
-	return
-}
-
-func (om *offsetManager) constructRequest() *OffsetCommitRequest {
-	var r *OffsetCommitRequest
-	var perPartitionTimestamp int64
-	if om.conf.Consumer.Offsets.Retention == 0 {
-		perPartitionTimestamp = ReceiveTime
-		r = &OffsetCommitRequest{
-			Version:                 1,
-			ConsumerGroup:           om.group,
-			ConsumerGroupGeneration: GroupGenerationUndefined,
-		}
-	} else {
-		r = &OffsetCommitRequest{
-			Version:                 2,
-			RetentionTime:           int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
-			ConsumerGroup:           om.group,
-			ConsumerGroupGeneration: GroupGenerationUndefined,
-		}
-
-	}
-
-	om.pomsMu.Lock()
-	defer om.pomsMu.Unlock()
-
-	for _, topicManagers := range om.poms {
-		for _, pom := range topicManagers {
-			pom.lock.Lock()
-			if pom.dirty {
-				r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
-			}
-			pom.lock.Unlock()
-		}
-	}
-
-	if len(r.blocks) > 0 {
-		return r
-	}
-
-	return nil
 }
 
 func (om *offsetManager) handleError(err error) {
-	om.pomsMu.Lock()
-	defer om.pomsMu.Unlock()
+	om.pomsLock.Lock()
+	defer om.pomsLock.Unlock()
 
 	for _, topicManagers := range om.poms {
 		for _, pom := range topicManagers {
@@ -343,8 +343,8 @@ func (om *offsetManager) handleError(err error) {
 }
 
 func (om *offsetManager) asyncClosePOMs() {
-	om.pomsMu.Lock()
-	defer om.pomsMu.Unlock()
+	om.pomsLock.Lock()
+	defer om.pomsLock.Unlock()
 
 	for _, topicManagers := range om.poms {
 		for _, pom := range topicManagers {
@@ -353,12 +353,13 @@ func (om *offsetManager) asyncClosePOMs() {
 	}
 }
 
-func (om *offsetManager) releasePOMs(force bool) {
-	om.pomsMu.Lock()
-	defer om.pomsMu.Unlock()
+// Releases/removes closed POMs once they are clean (or when forced)
+func (om *offsetManager) releasePOMs(force bool) (remaining int) {
+	om.pomsLock.Lock()
+	defer om.pomsLock.Unlock()
 
-	for _, topicManagers := range om.poms {
-		for _, pom := range topicManagers {
+	for topic, topicManagers := range om.poms {
+		for partition, pom := range topicManagers {
 			pom.lock.Lock()
 			releaseDue := pom.done && (force || !pom.dirty)
 			pom.lock.Unlock()
@@ -366,13 +367,15 @@ func (om *offsetManager) releasePOMs(force bool) {
 			if releaseDue {
 				pom.release()
 
-				delete(om.poms[pom.topic], pom.partition)
-				if len(om.poms[pom.topic]) == 0 {
-					delete(om.poms, pom.topic)
+				delete(om.poms[topic], partition)
+				if len(om.poms[topic]) == 0 {
+					delete(om.poms, topic)
 				}
 			}
 		}
+		remaining += len(om.poms[topic])
 	}
+	return
 }
 
 // Partition Offset Manager