Browse Source

server: broadcast leader changed

nolouch 7 years ago
parent
commit
c15fb607f6

+ 2 - 2
clientv3/integration/network_partition_test.go

@@ -303,14 +303,14 @@ func TestDropReadUnderNetworkPartition(t *testing.T) {
 
 
 	clus.Members[leaderIndex].InjectPartition(t, clus.Members[(leaderIndex+1)%3], clus.Members[(leaderIndex+2)%3])
 	clus.Members[leaderIndex].InjectPartition(t, clus.Members[(leaderIndex+1)%3], clus.Members[(leaderIndex+2)%3])
 	kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), nil)
 	kvc := clientv3.NewKVFromKVClient(pb.NewKVClient(conn), nil)
-	ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
+	ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
 	_, err = kvc.Get(ctx, "a")
 	_, err = kvc.Get(ctx, "a")
 	cancel()
 	cancel()
 	if err != rpctypes.ErrLeaderChanged {
 	if err != rpctypes.ErrLeaderChanged {
 		t.Fatalf("expected %v, got %v", rpctypes.ErrLeaderChanged, err)
 		t.Fatalf("expected %v, got %v", rpctypes.ErrLeaderChanged, err)
 	}
 	}
 
 
-	ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
+	ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
 	_, err = kvc.Get(ctx, "a")
 	_, err = kvc.Get(ctx, "a")
 	cancel()
 	cancel()
 	if err != nil {
 	if err != nil {

+ 14 - 6
etcdserver/server.go

@@ -213,7 +213,8 @@ type EtcdServer struct {
 	// done is closed when all goroutines from start() complete.
 	// done is closed when all goroutines from start() complete.
 	done chan struct{}
 	done chan struct{}
 	// leaderChanged is used to notify the linearizable read loop to drop the old read requests.
 	// leaderChanged is used to notify the linearizable read loop to drop the old read requests.
-	leaderChanged chan struct{}
+	leaderChanged   chan struct{}
+	leaderChangedMu sync.RWMutex
 
 
 	errorc     chan error
 	errorc     chan error
 	id         types.ID
 	id         types.ID
@@ -754,7 +755,7 @@ func (s *EtcdServer) start() {
 	s.ctx, s.cancel = context.WithCancel(context.Background())
 	s.ctx, s.cancel = context.WithCancel(context.Background())
 	s.readwaitc = make(chan struct{}, 1)
 	s.readwaitc = make(chan struct{}, 1)
 	s.readNotifier = newNotifier()
 	s.readNotifier = newNotifier()
-	s.leaderChanged = make(chan struct{}, 1)
+	s.leaderChanged = make(chan struct{})
 	if s.ClusterVersion() != nil {
 	if s.ClusterVersion() != nil {
 		if lg != nil {
 		if lg != nil {
 			lg.Info(
 			lg.Info(
@@ -942,10 +943,11 @@ func (s *EtcdServer) run() {
 				}
 				}
 			}
 			}
 			if newLeader {
 			if newLeader {
-				select {
-				case s.leaderChanged <- struct{}{}:
-				default:
-				}
+				s.leaderChangedMu.Lock()
+				lc := s.leaderChanged
+				s.leaderChanged = make(chan struct{})
+				s.leaderChangedMu.Unlock()
+				close(lc)
 			}
 			}
 			// TODO: remove the nil checking
 			// TODO: remove the nil checking
 			// current test utility does not provide the stats
 			// current test utility does not provide the stats
@@ -1696,6 +1698,12 @@ func (s *EtcdServer) getLead() uint64 {
 	return atomic.LoadUint64(&s.lead)
 	return atomic.LoadUint64(&s.lead)
 }
 }
 
 
+func (s *EtcdServer) leaderChangedNotify() <-chan struct{} {
+	s.leaderChangedMu.RLock()
+	defer s.leaderChangedMu.RUnlock()
+	return s.leaderChanged
+}
+
 // RaftStatusGetter represents etcd server and Raft progress.
 // RaftStatusGetter represents etcd server and Raft progress.
 type RaftStatusGetter interface {
 type RaftStatusGetter interface {
 	ID() types.ID
 	ID() types.ID

+ 3 - 3
etcdserver/v3_server.go

@@ -634,9 +634,9 @@ func (s *EtcdServer) linearizableReadLoop() {
 		ctxToSend := make([]byte, 8)
 		ctxToSend := make([]byte, 8)
 		id1 := s.reqIDGen.Next()
 		id1 := s.reqIDGen.Next()
 		binary.BigEndian.PutUint64(ctxToSend, id1)
 		binary.BigEndian.PutUint64(ctxToSend, id1)
-
+		leaderChangedNotifier := s.leaderChangedNotify()
 		select {
 		select {
-		case <-s.leaderChanged:
+		case <-leaderChangedNotifier:
 			continue
 			continue
 		case <-s.readwaitc:
 		case <-s.readwaitc:
 		case <-s.stopping:
 		case <-s.stopping:
@@ -694,7 +694,7 @@ func (s *EtcdServer) linearizableReadLoop() {
 					}
 					}
 					slowReadIndex.Inc()
 					slowReadIndex.Inc()
 				}
 				}
-			case <-s.leaderChanged:
+			case <-leaderChangedNotifier:
 				timeout = true
 				timeout = true
 				readIndexFailed.Inc()
 				readIndexFailed.Inc()
 				// return a retryable error.
 				// return a retryable error.

+ 4 - 2
integration/v3_lease_test.go

@@ -565,12 +565,14 @@ func TestV3LeaseFailover(t *testing.T) {
 	md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
 	md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader)
 	mctx := metadata.NewOutgoingContext(context.Background(), md)
 	mctx := metadata.NewOutgoingContext(context.Background(), md)
 	ctx, cancel := context.WithCancel(mctx)
 	ctx, cancel := context.WithCancel(mctx)
-	defer cancel()
 	lac, err := lc.LeaseKeepAlive(ctx)
 	lac, err := lc.LeaseKeepAlive(ctx)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	defer lac.CloseSend()
+	defer func() {
+		lac.CloseSend()
+		cancel()
+	}()
 
 
 	// send keep alive to old leader until the old leader starts
 	// send keep alive to old leader until the old leader starts
 	// to drop lease request.
 	// to drop lease request.