Browse Source

Merge pull request #7335 from heyitsanthony/leadership-kick

grpcproxy: support forcing leader as available
Anthony Romano 8 years ago
parent
commit
49a12371c1
1 changed files with 24 additions and 4 deletions
  1. 24 4
      proxy/grpcproxy/leader.go

+ 24 - 4
proxy/grpcproxy/leader.go

@@ -63,24 +63,44 @@ func (l *leader) recvLoop() {
 		wch := l.w.Watch(l.ctx, lostLeaderKey, clientv3.WithRev(rev), clientv3.WithCreatedNotify())
 		wch := l.w.Watch(l.ctx, lostLeaderKey, clientv3.WithRev(rev), clientv3.WithCreatedNotify())
 		cresp, ok := <-wch
 		cresp, ok := <-wch
 		if !ok {
 		if !ok {
+			l.loseLeader()
 			continue
 			continue
 		}
 		}
 		if cresp.Err() != nil {
 		if cresp.Err() != nil {
+			l.loseLeader()
 			if grpc.ErrorDesc(cresp.Err()) == grpc.ErrClientConnClosing.Error() {
 			if grpc.ErrorDesc(cresp.Err()) == grpc.ErrClientConnClosing.Error() {
 				close(l.disconnc)
 				close(l.disconnc)
 				return
 				return
 			}
 			}
 			continue
 			continue
 		}
 		}
-		// leader is available
-		l.mu.Lock()
-		l.leaderc = make(chan struct{})
-		l.mu.Unlock()
+		l.gotLeader()
 		<-wch
 		<-wch
+		l.loseLeader()
+	}
+}
+
+func (l *leader) loseLeader() {
+	l.mu.RLock()
+	defer l.mu.RUnlock()
+	select {
+	case <-l.leaderc:
+	default:
 		close(l.leaderc)
 		close(l.leaderc)
 	}
 	}
 }
 }
 
 
+// gotLeader will force update the leadership status to having a leader.
+func (l *leader) gotLeader() {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	select {
+	case <-l.leaderc:
+		l.leaderc = make(chan struct{})
+	default:
+	}
+}
+
 func (l *leader) disconnectNotify() <-chan struct{} { return l.disconnc }
 func (l *leader) disconnectNotify() <-chan struct{} { return l.disconnc }
 
 
 func (l *leader) stopNotify() <-chan struct{} { return l.donec }
 func (l *leader) stopNotify() <-chan struct{} { return l.donec }