Browse Source

Deregister a partition offset manager in the OffsetManager when it is done, so we can register a new one later.

Willem van Bergen 10 years ago
parent
commit
0ae0505b85
1 changed files with 11 additions and 0 deletions
  1. 11 0
      offset_manager.go

+ 11 - 0
offset_manager.go

@@ -111,6 +111,16 @@ func (om *offsetManager) abandonBroker(bom *brokerOffsetManager) {
 	delete(om.boms, bom.broker)
 }
 
+func (om *offsetManager) abandonPartitionOffsetManager(pom *partitionOffsetManager) {
+	om.lock.Lock()
+	defer om.lock.Unlock()
+
+	delete(om.poms[pom.topic], pom.partition)
+	if len(om.poms[pom.topic]) == 0 {
+		delete(om.poms, pom.topic)
+	}
+}
+
 // Partition Offset Manager
 
 // PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
@@ -211,6 +221,7 @@ func (pom *partitionOffsetManager) mainLoop() {
 				}
 				pom.parent.unrefBrokerOffsetManager(pom.broker)
 			}
+			pom.parent.abandonPartitionOffsetManager(pom)
 			close(pom.errors)
 			return
 		}