|
|
@@ -162,14 +162,8 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- // Get coordinator
|
|
|
- coordinator, err := c.client.Coordinator(c.groupID)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
// Init session
|
|
|
- sess, err := c.newSession(ctx, coordinator, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
|
|
|
+ sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
|
|
|
if err == ErrClosedClient {
|
|
|
return ErrClosedConsumerGroup
|
|
|
} else if err != nil {
|
|
|
@@ -183,7 +177,33 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
|
|
|
return sess.release(true)
|
|
|
}
|
|
|
|
|
|
-func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
|
|
|
+func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
|
|
|
+ select {
|
|
|
+ case <-c.closed:
|
|
|
+ return nil, ErrClosedConsumerGroup
|
|
|
+ case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
|
|
|
+ }
|
|
|
+
|
|
|
+ if refreshCoordinator {
|
|
|
+ err := c.client.RefreshCoordinator(c.groupID)
|
|
|
+ if err != nil {
|
|
|
+ return c.retryNewSession(ctx, topics, handler, retries, true)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return c.newSession(ctx, topics, handler, retries-1)
|
|
|
+}
|
|
|
+
|
|
|
+func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
|
|
|
+ coordinator, err := c.client.Coordinator(c.groupID)
|
|
|
+ if err != nil {
|
|
|
+ if retries <= 0 {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
+ return c.retryNewSession(ctx, topics, handler, retries, true)
|
|
|
+ }
|
|
|
+
|
|
|
// Join consumer group
|
|
|
join, err := c.joinGroupRequest(coordinator, topics)
|
|
|
if err != nil {
|
|
|
@@ -195,19 +215,21 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
|
|
|
c.memberID = join.MemberId
|
|
|
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
|
|
|
c.memberID = ""
|
|
|
- return c.newSession(ctx, coordinator, topics, handler, retries)
|
|
|
- case ErrRebalanceInProgress: // retry after backoff
|
|
|
+ return c.newSession(ctx, topics, handler, retries)
|
|
|
+ case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
|
|
|
if retries <= 0 {
|
|
|
return nil, join.Err
|
|
|
}
|
|
|
|
|
|
- select {
|
|
|
- case <-c.closed:
|
|
|
- return nil, ErrClosedConsumerGroup
|
|
|
- case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
|
|
|
+ _ = coordinator.Close()
|
|
|
+
|
|
|
+ return c.retryNewSession(ctx, topics, handler, retries, true)
|
|
|
+ case ErrRebalanceInProgress: // retry after backoff
|
|
|
+ if retries <= 0 {
|
|
|
+ return nil, join.Err
|
|
|
}
|
|
|
|
|
|
- return c.newSession(ctx, coordinator, topics, handler, retries-1)
|
|
|
+ return c.retryNewSession(ctx, topics, handler, retries, false)
|
|
|
default:
|
|
|
return nil, join.Err
|
|
|
}
|
|
|
@@ -236,19 +258,21 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
|
|
|
case ErrNoError:
|
|
|
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
|
|
|
c.memberID = ""
|
|
|
- return c.newSession(ctx, coordinator, topics, handler, retries)
|
|
|
- case ErrRebalanceInProgress: // retry after backoff
|
|
|
+ return c.newSession(ctx, topics, handler, retries)
|
|
|
+ case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
|
|
|
if retries <= 0 {
|
|
|
return nil, sync.Err
|
|
|
}
|
|
|
|
|
|
- select {
|
|
|
- case <-c.closed:
|
|
|
- return nil, ErrClosedConsumerGroup
|
|
|
- case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
|
|
|
+ _ = coordinator.Close()
|
|
|
+
|
|
|
+ return c.retryNewSession(ctx, topics, handler, retries, true)
|
|
|
+ case ErrRebalanceInProgress: // retry after backoff
|
|
|
+ if retries <= 0 {
|
|
|
+ return nil, sync.Err
|
|
|
}
|
|
|
|
|
|
- return c.newSession(ctx, coordinator, topics, handler, retries-1)
|
|
|
+ return c.retryNewSession(ctx, topics, handler, retries, false)
|
|
|
default:
|
|
|
return nil, sync.Err
|
|
|
}
|