Browse Source

Correct coordinator checkin

Dimitrij Denissenko 7 years ago
parent
commit
2a96a24d73
1 changed files with 13 additions and 19 deletions
  1. 13 19
      offset_manager.go

+ 13 - 19
offset_manager.go

@@ -105,7 +105,9 @@ func (om *offsetManager) Close() error {
 		}
 
 		om.releasePOMs(true)
-		om.releaseCoordinator()
+		om.brokerMu.Lock()
+		om.broker = nil
+		om.brokerMu.Unlock()
 	})
 	return nil
 }
@@ -129,7 +131,7 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri
 		if retries <= 0 {
 			return 0, "", err
 		}
-		om.releaseCoordinator()
+		om.releaseCoordinator(broker)
 		return om.fetchInitialOffset(topic, partition, retries-1)
 	}
 
@@ -145,7 +147,7 @@ func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retri
 		if retries <= 0 {
 			return 0, "", block.Err
 		}
-		om.releaseCoordinator()
+		om.releaseCoordinator(broker)
 		return om.fetchInitialOffset(topic, partition, retries-1)
 	case ErrOffsetsLoadInProgress:
 		if retries <= 0 {
@@ -191,21 +193,12 @@ func (om *offsetManager) coordinator() (*Broker, error) {
 	return broker, nil
 }
 
-func (om *offsetManager) releaseCoordinator() {
+func (om *offsetManager) releaseCoordinator(b *Broker) {
 	om.brokerMu.Lock()
-	om.broker = nil
-	om.brokerMu.Unlock()
-}
-
-func (om *offsetManager) abandonCoordinator() {
-	om.brokerMu.Lock()
-	broker := om.broker
-	om.broker = nil
-	om.brokerMu.Unlock()
-
-	if broker != nil {
-		_ = broker.Close()
+	if om.broker == b {
+		om.broker = nil
 	}
+	om.brokerMu.Unlock()
 }
 
 func (om *offsetManager) mainLoop() {
@@ -237,8 +230,9 @@ func (om *offsetManager) flushToBroker() (success bool) {
 
 	response, err := broker.CommitOffset(request)
 	if err != nil {
-		om.abandonCoordinator()
 		om.handleError(err)
+		om.releaseCoordinator(broker)
+		_ = broker.Close()
 		return false
 	}
 
@@ -272,7 +266,7 @@ func (om *offsetManager) flushToBroker() (success bool) {
 			case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
 				ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
 				// not a critical error, we just need to redispatch
-				om.releaseCoordinator()
+				om.releaseCoordinator(broker)
 			case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
 				// nothing we can do about this, just tell the user and carry on
 				success = false
@@ -290,7 +284,7 @@ func (om *offsetManager) flushToBroker() (success bool) {
 				// dunno, tell the user and try redispatching
 				success = false
 				pom.handleError(err)
-				om.releaseCoordinator()
+				om.releaseCoordinator(broker)
 			}
 		}
 	}