|
|
@@ -63,24 +63,44 @@ func (l *leader) recvLoop() {
|
|
|
wch := l.w.Watch(l.ctx, lostLeaderKey, clientv3.WithRev(rev), clientv3.WithCreatedNotify())
|
|
|
cresp, ok := <-wch
|
|
|
if !ok {
|
|
|
+ l.loseLeader()
|
|
|
continue
|
|
|
}
|
|
|
if cresp.Err() != nil {
|
|
|
+ l.loseLeader()
|
|
|
if grpc.ErrorDesc(cresp.Err()) == grpc.ErrClientConnClosing.Error() {
|
|
|
close(l.disconnc)
|
|
|
return
|
|
|
}
|
|
|
continue
|
|
|
}
|
|
|
- // leader is available
|
|
|
- l.mu.Lock()
|
|
|
- l.leaderc = make(chan struct{})
|
|
|
- l.mu.Unlock()
|
|
|
+ l.gotLeader()
|
|
|
<-wch
|
|
|
+ l.loseLeader()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (l *leader) loseLeader() {
|
|
|
+ l.mu.RLock()
|
|
|
+ defer l.mu.RUnlock()
|
|
|
+ select {
|
|
|
+ case <-l.leaderc:
|
|
|
+ default:
|
|
|
close(l.leaderc)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// gotLeader will force update the leadership status to having a leader.
|
|
|
+func (l *leader) gotLeader() {
|
|
|
+ l.mu.Lock()
|
|
|
+ defer l.mu.Unlock()
|
|
|
+ select {
|
|
|
+ case <-l.leaderc:
|
|
|
+ l.leaderc = make(chan struct{})
|
|
|
+ default:
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func (l *leader) disconnectNotify() <-chan struct{} { return l.disconnc }
|
|
|
|
|
|
func (l *leader) stopNotify() <-chan struct{} { return l.donec }
|