Browse Source

Simplify code. Run heartbeat for as long as possible

Dimitrij Denissenko 7 years ago
parent
commit
c80d82cf6c
2 changed files with 92 additions and 102 deletions
  1. 1 1
      config.go
  2. 91 101
      consumer_group.go

+ 1 - 1
config.go

@@ -201,7 +201,7 @@ type Config struct {
 				// The maximum allowed time for each worker to join the group once a rebalance has begun.
 				// This is basically a limit on the amount of time needed for all tasks to flush any pending
 				// data and commit offsets. If the timeout is exceeded, then the worker will be removed from
-				// the group, which will cause offset commit failures (default 60s.
+				// the group, which will cause offset commit failures (default 60s).
 				Timeout time.Duration
 
 				Retry struct {

+ 91 - 101
consumer_group.go

@@ -49,9 +49,7 @@ type consumerGroup struct {
 	memberID string
 	errors   chan error
 
-	session *consumerGroupSession
-	lock    sync.Mutex
-
+	lock      sync.Mutex
 	closed    chan none
 	closeOnce sync.Once
 }
@@ -108,14 +106,6 @@ func (c *consumerGroup) Close() (err error) {
 		c.lock.Lock()
 		defer c.lock.Unlock()
 
-		// stop session
-		if c.session != nil {
-			if e := c.session.release(true); e != nil {
-				err = e
-			}
-			c.session = nil
-		}
-
 		// leave group
 		if e := c.leave(); e != nil {
 			err = e
@@ -146,69 +136,41 @@ func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler Co
 	default:
 	}
 
-	// Quick exit when no topics are provided
-	if len(topics) == 0 {
-		return fmt.Errorf("no topics provided")
-	}
-
-	// Start session
-	sess, err := c.startSession(ctx, topics, handler)
-	if err == ErrClosedClient {
-		return ErrClosedConsumerGroup
-	} else if err != nil {
-		return err
-	}
-
-	// Wait for session exit signal
-	<-sess.ctx.Done()
-
-	// Gracefully release session claims
-	err = sess.release(true)
-	c.lock.Lock()
-	if c.session == sess {
-		c.session = nil
-	}
-	c.lock.Unlock()
-	return err
-}
-
-func (c *consumerGroup) startSession(ctx context.Context, topics []string, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
 	c.lock.Lock()
 	defer c.lock.Unlock()
 
-	if c.session != nil {
-		return nil, fmt.Errorf("another consumer group session already in progress")
+	// Quick exit when no topics are provided
+	if len(topics) == 0 {
+		return fmt.Errorf("no topics provided")
 	}
 
+	// Refresh metadata for requested topics
 	if err := c.client.RefreshMetadata(topics...); err != nil {
-		return nil, err
-	}
-	if err := c.client.RefreshCoordinator(c.groupID); err != nil {
-		return nil, err
+		return err
 	}
 
+	// Get coordinator
 	coordinator, err := c.client.Coordinator(c.groupID)
 	if err != nil {
-		return nil, err
+		return err
 	}
 
+	// Init session
 	sess, err := c.newSession(ctx, coordinator, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
-	if err != nil {
-		return nil, err
+	if err == ErrClosedClient {
+		return ErrClosedConsumerGroup
+	} else if err != nil {
+		return err
 	}
 
-	// Store session
-	c.session = sess
-	return sess, nil
+	// Wait for session exit signal
+	<-sess.ctx.Done()
+
+	// Gracefully release session claims
+	return sess.release(true)
 }
 
 func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
-	select {
-	case <-c.closed:
-		return nil, ErrClosedConsumerGroup
-	default:
-	}
-
 	// Join consumer group
 	join, err := c.joinGroupRequest(coordinator, topics)
 	if err != nil {
@@ -218,9 +180,21 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
 	switch join.Err {
 	case ErrNoError:
 		c.memberID = join.MemberId
-	case ErrUnknownMemberId: // reset member ID and retry
+	case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
 		c.memberID = ""
 		return c.newSession(ctx, coordinator, topics, handler, retries)
+	case ErrRebalanceInProgress: // retry after backoff
+		if retries <= 0 {
+			return nil, join.Err
+		}
+
+		select {
+		case <-c.closed:
+			return nil, ErrClosedConsumerGroup
+		case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
+		}
+
+		return c.newSession(ctx, coordinator, topics, handler, retries-1)
 	default:
 		return nil, join.Err
 	}
@@ -247,7 +221,10 @@ func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, top
 	}
 	switch sync.Err {
 	case ErrNoError:
-	case ErrRebalanceInProgress:
+	case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
+		c.memberID = ""
+		return c.newSession(ctx, coordinator, topics, handler, retries)
+	case ErrRebalanceInProgress: // retry after backoff
 		if retries <= 0 {
 			return nil, sync.Err
 		}
@@ -321,6 +298,16 @@ func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrate
 	return coordinator.SyncGroup(req)
 }
 
+func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
+	req := &HeartbeatRequest{
+		GroupId:      c.groupID,
+		MemberId:     memberID,
+		GenerationId: generationID,
+	}
+
+	return coordinator.Heartbeat(req)
+}
+
 func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
 	topics := make(map[string][]int32)
 	for _, meta := range members {
@@ -341,30 +328,6 @@ func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata)
 	return strategy.Plan(members, topics)
 }
 
-func (c *consumerGroup) heartbeat(memberID string, generationID int32) error {
-	coordinator, err := c.client.Coordinator(c.groupID)
-	if err != nil {
-		return err
-	}
-
-	resp, err := coordinator.Heartbeat(&HeartbeatRequest{
-		GroupId:      c.groupID,
-		MemberId:     memberID,
-		GenerationId: generationID,
-	})
-	if err != nil {
-		_ = coordinator.Close()
-		return err
-	}
-
-	switch resp.Err {
-	case ErrNoError:
-		return nil
-	default:
-		return resp.Err
-	}
-}
-
 // Leaves the cluster, called by Close, protected by lock.
 func (c *consumerGroup) leave() error {
 	if c.memberID == "" {
@@ -469,8 +432,9 @@ type consumerGroupSession struct {
 	ctx     context.Context
 	cancel  func()
 
-	waitGroup   sync.WaitGroup
-	releaseOnce sync.Once
+	waitGroup       sync.WaitGroup
+	releaseOnce     sync.Once
+	hbDying, hbDead chan none
 }
 
 func newConsumerGroupSession(parent *consumerGroup, ctx context.Context, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
@@ -481,9 +445,6 @@ func newConsumerGroupSession(parent *consumerGroup, ctx context.Context, claims
 	}
 
 	// init context
-	if ctx == nil {
-		ctx = context.Background()
-	}
 	ctx, cancel := context.WithCancel(ctx)
 
 	// init session
@@ -496,10 +457,11 @@ func newConsumerGroupSession(parent *consumerGroup, ctx context.Context, claims
 		claims:       claims,
 		ctx:          ctx,
 		cancel:       cancel,
+		hbDying:      make(chan none),
+		hbDead:       make(chan none),
 	}
 
 	// start heartbeat loop
-	sess.waitGroup.Add(1)
 	go sess.heartbeatLoop()
 
 	// create a POM for each claim
@@ -614,7 +576,6 @@ func (s *consumerGroupSession) consume(topic string, partition int32) {
 	for _, err := range claim.waitClosed() {
 		s.parent.handleError(err, topic, partition)
 	}
-	return
 }
 
 func (s *consumerGroupSession) release(withCleanup bool) (err error) {
@@ -636,30 +597,59 @@ func (s *consumerGroupSession) release(withCleanup bool) (err error) {
 		if e := s.offsets.Close(); e != nil {
 			err = e
 		}
+
+		close(s.hbDying)
+		<-s.hbDead
 	})
 
 	return
 }
 
 func (s *consumerGroupSession) heartbeatLoop() {
-	defer s.waitGroup.Done()
+	defer close(s.hbDead)
 	defer s.cancel() // trigger the end of the session on exit
 
-	heartbeat := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
-	defer heartbeat.Stop()
+	pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
+	defer pause.Stop()
 
+	retries := s.parent.config.Metadata.Retry.Max
 	for {
-		select {
-		case <-heartbeat.C:
-			if err := s.parent.heartbeat(s.memberID, s.generationID); err != nil {
-				switch err {
-				case ErrRebalanceInProgress:
-				default:
-					s.parent.handleError(err, "", -1)
-				}
+		coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
+		if err != nil {
+			if retries <= 0 {
+				s.parent.handleError(err, "", -1)
 				return
 			}
-		case <-s.ctx.Done():
+
+			select {
+			case <-s.hbDying:
+				return
+			case <-time.After(s.parent.config.Metadata.Retry.Backoff):
+				retries--
+			}
+			continue
+		}
+
+		resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
+		if err != nil {
+			_ = coordinator.Close()
+			retries--
+			continue
+		}
+
+		switch resp.Err {
+		case ErrNoError:
+			retries = s.parent.config.Metadata.Retry.Max
+		case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
+			return
+		default:
+			s.parent.handleError(err, "", -1)
+			return
+		}
+
+		select {
+		case <-pause.C:
+		case <-s.hbDying:
 			return
 		}
 	}