Browse Source

Merge pull request #658 from Shopify/offset-manager-shutdown-take-2

Fix race condition on OffsetManager shutdown
Evan Huus 9 years ago
parent
commit
2acd68e45e
1 changed files with 6 additions and 11 deletions
  1. 6 11
      offset_manager.go

+ 6 - 11
offset_manager.go

@@ -176,7 +176,7 @@ type partitionOffsetManager struct {
 	offset   int64
 	metadata string
 	dirty    bool
-	clean    chan none
+	clean    sync.Cond
 	broker   *brokerOffsetManager
 
 	errors    chan *ConsumerError
@@ -189,11 +189,11 @@ func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32
 		parent:    om,
 		topic:     topic,
 		partition: partition,
-		clean:     make(chan none),
 		errors:    make(chan *ConsumerError, om.conf.ChannelBufferSize),
 		rebalance: make(chan none, 1),
 		dying:     make(chan none),
 	}
+	pom.clean.L = &pom.lock
 
 	if err := pom.selectBroker(); err != nil {
 		return nil, err
@@ -331,11 +331,7 @@ func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string
 
 	if pom.offset == offset && pom.metadata == metadata {
 		pom.dirty = false
-
-		select {
-		case pom.clean <- none{}:
-		default:
-		}
+		pom.clean.Signal()
 	}
 }
 
@@ -353,11 +349,10 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) {
 func (pom *partitionOffsetManager) AsyncClose() {
 	go func() {
 		pom.lock.Lock()
-		dirty := pom.dirty
-		pom.lock.Unlock()
+		defer pom.lock.Unlock()
 
-		if dirty {
-			<-pom.clean
+		for pom.dirty {
+			pom.clean.Wait()
 		}
 
 		close(pom.dying)