Sfoglia il codice sorgente

RefreshCoordinator() will handle closing the coordinator if necessary

Tom Lee 7 anni fa
parent
commit
4ff43dddea
1 ha cambiato i file con 0 aggiunte e 4 eliminazioni
  1. 0 4
      consumer_group.go

+ 0 - 4
consumer_group.go

@@ -221,8 +221,6 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
 			return nil, join.Err
 			return nil, join.Err
 		}
 		}
 
 
-		_ = coordinator.Close()
-
 		return c.retryNewSession(ctx, topics, handler, retries, true)
 		return c.retryNewSession(ctx, topics, handler, retries, true)
 	case ErrRebalanceInProgress: // retry after backoff
 	case ErrRebalanceInProgress: // retry after backoff
 		if retries <= 0 {
 		if retries <= 0 {
@@ -264,8 +262,6 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
 			return nil, sync.Err
 			return nil, sync.Err
 		}
 		}
 
 
-		_ = coordinator.Close()
-
 		return c.retryNewSession(ctx, topics, handler, retries, true)
 		return c.retryNewSession(ctx, topics, handler, retries, true)
 	case ErrRebalanceInProgress: // retry after backoff
 	case ErrRebalanceInProgress: // retry after backoff
 		if retries <= 0 {
 		if retries <= 0 {