Pārlūkot izejas kodu

Merge pull request #4791 from xiang90/l

integration: add TestV3LeaseFailover test
Xiang Li 9 gadi atpakaļ
vecāks
revīzija
179dc72fa7
3 mainītis faili ar 84 papildinājumiem un 0 dzēšanām
  1. 59 0
      integration/v3_lease_test.go
  2. 4 0
      rafthttp/peer.go
  3. 21 0
      rafthttp/stream.go

+ 59 - 0
integration/v3_lease_test.go

@@ -306,6 +306,65 @@ func TestV3LeaseSwitch(t *testing.T) {
 	}
 }
 
+// TestV3LeaseFailover ensures the old leader drops lease keepalive requests within
+// election timeout after it loses its quorum. And the new leader extends the TTL of
+// the lease to at least TTL + election timeout.
+func TestV3LeaseFailover(t *testing.T) {
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	toIsolate := clus.waitLeader(t, clus.Members)
+
+	lc := toGRPC(clus.Client(toIsolate)).Lease
+
+	// create lease
+	lresp, err := lc.LeaseCreate(context.TODO(), &pb.LeaseCreateRequest{TTL: 5})
+	if err != nil {
+		t.Fatal(err)
+	}
+	if lresp.Error != "" {
+		t.Fatal(lresp.Error)
+	}
+
+	// isolate the current leader with its followers.
+	clus.Members[toIsolate].Pause()
+
+	lreq := &pb.LeaseKeepAliveRequest{ID: lresp.ID}
+
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	lac, err := lc.LeaseKeepAlive(ctx)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer lac.CloseSend()
+
+	// send keep alive to old leader until the old leader starts
+	// to drop lease request.
+	var expectedExp time.Time
+	for {
+		if err = lac.Send(lreq); err != nil {
+			break
+		}
+		lkresp, rxerr := lac.Recv()
+		if rxerr != nil {
+			break
+		}
+		expectedExp = time.Now().Add(time.Duration(lkresp.TTL) * time.Second)
+		time.Sleep(time.Duration(lkresp.TTL/2) * time.Second)
+	}
+
+	clus.Members[toIsolate].Resume()
+	clus.waitLeader(t, clus.Members)
+
+	// lease should not expire at the last received expire deadline.
+	time.Sleep(expectedExp.Sub(time.Now()) - 500*time.Millisecond)
+
+	if !leaseExist(t, clus, lresp.ID) {
+		t.Error("unexpected lease not exists")
+	}
+}
+
 // acquireLeaseAndKey creates a new lease and creates an attached key.
 func acquireLeaseAndKey(clus *ClusterV3, key string) (int64, error) {
 	// create lease

+ 4 - 0
rafthttp/peer.go

@@ -217,6 +217,8 @@ func (p *peer) Pause() {
 	p.mu.Lock()
 	defer p.mu.Unlock()
 	p.paused = true
+	p.msgAppReader.pause()
+	p.msgAppV2Reader.pause()
 }
 
 // Resume resumes a paused peer.
@@ -224,6 +226,8 @@ func (p *peer) Resume() {
 	p.mu.Lock()
 	defer p.mu.Unlock()
 	p.paused = false
+	p.msgAppReader.resume()
+	p.msgAppV2Reader.resume()
 }
 
 func (p *peer) stop() {

+ 21 - 0
rafthttp/stream.go

@@ -252,6 +252,7 @@ type streamReader struct {
 	errorc        chan<- error
 
 	mu     sync.Mutex
+	paused bool
 	cancel func()
 	closer io.Closer
 	stopc  chan struct{}
@@ -331,6 +332,14 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
 			return err
 		}
 
+		cr.mu.Lock()
+		paused := cr.paused
+		cr.mu.Unlock()
+
+		if paused {
+			continue
+		}
+
 		if isLinkHeartbeatMessage(m) {
 			// raft is not interested in link layer
 			// heartbeat message, so we should ignore
@@ -463,6 +472,18 @@ func (cr *streamReader) close() {
 	cr.closer = nil
 }
 
+func (cr *streamReader) pause() {
+	cr.mu.Lock()
+	defer cr.mu.Unlock()
+	cr.paused = true
+}
+
+func (cr *streamReader) resume() {
+	cr.mu.Lock()
+	defer cr.mu.Unlock()
+	cr.paused = false
+}
+
 func isClosedConnectionError(err error) bool {
 	operr, ok := err.(*net.OpError)
 	return ok && operr.Err.Error() == "use of closed network connection"