Browse Source

Don't shutdown until the latest offset was successfully committed

Evan Huus 10 years ago
parent
commit
e7d8e17d23
1 changed files with 18 additions and 1 deletions
  1. 18 1
      offset_manager.go

+ 18 - 1
offset_manager.go

@@ -141,6 +141,7 @@ type partitionOffsetManager struct {
 	offset   int64
 	offset   int64
 	metadata string
 	metadata string
 	dirty    bool
 	dirty    bool
+	clean    chan none
 	broker   *brokerOffsetManager
 	broker   *brokerOffsetManager
 
 
 	errors    chan *ConsumerError
 	errors    chan *ConsumerError
@@ -153,6 +154,7 @@ func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32
 		parent:    om,
 		parent:    om,
 		topic:     topic,
 		topic:     topic,
 		partition: partition,
 		partition: partition,
+		clean:     make(chan none),
 		errors:    make(chan *ConsumerError, om.conf.ChannelBufferSize),
 		errors:    make(chan *ConsumerError, om.conf.ChannelBufferSize),
 		rebalance: make(chan none, 1),
 		rebalance: make(chan none, 1),
 		dying:     make(chan none),
 		dying:     make(chan none),
@@ -288,6 +290,11 @@ func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string
 
 
 	if pom.offset == offset && pom.metadata == metadata {
 	if pom.offset == offset && pom.metadata == metadata {
 		pom.dirty = false
 		pom.dirty = false
+
+		select {
+		case pom.clean <- none{}:
+		default:
+		}
 	}
 	}
 }
 }
 
 
@@ -299,7 +306,17 @@ func (pom *partitionOffsetManager) Offset() (int64, string) {
 }
 }
 
 
 func (pom *partitionOffsetManager) AsyncClose() {
 func (pom *partitionOffsetManager) AsyncClose() {
-	close(pom.dying)
+	go func() {
+		pom.lock.Lock()
+		dirty := pom.dirty
+		pom.lock.Unlock()
+
+		if dirty {
+			<-pom.clean
+		}
+
+		close(pom.dying)
+	}()
 }
 }
 
 
 func (pom *partitionOffsetManager) Close() error {
 func (pom *partitionOffsetManager) Close() error {