瀏覽代碼

etcdserver: add timeout for processing v3 request

Xiang Li 9 年之前
父節點
當前提交
6ee5f9c677
共有 2 個文件被更改,包括 45 次插入3 次删除
  1. 9 3
      etcdserver/v3_server.go
  2. 36 0
      integration/v3_grpc_test.go

+ 9 - 3
etcdserver/v3_server.go

@@ -30,6 +30,9 @@ const (
 	// accept large request which might block raft stream. User
 	// specify a large value might end up with shooting in the foot.
 	maxRequestBytes = 1.5 * 1024 * 1024
+
+	// max timeout for waiting a v3 request to go through raft.
+	maxV3RequestTimeout = 5 * time.Second
 )
 
 type RaftKV interface {
@@ -283,14 +286,17 @@ func (s *EtcdServer) processInternalRaftRequest(ctx context.Context, r pb.Intern
 
 	ch := s.w.Register(r.ID)
 
-	s.r.Propose(ctx, data)
+	cctx, cancel := context.WithTimeout(ctx, maxV3RequestTimeout)
+	defer cancel()
+
+	s.r.Propose(cctx, data)
 
 	select {
 	case x := <-ch:
 		return x.(*applyResult), nil
-	case <-ctx.Done():
+	case <-cctx.Done():
 		s.w.Trigger(r.ID, nil) // GC wait
-		return nil, ctx.Err()
+		return nil, cctx.Err()
 	case <-s.done:
 		return nil, ErrStopped
 	}

+ 36 - 0
integration/v3_grpc_test.go

@@ -16,6 +16,7 @@ package integration
 
 import (
 	"fmt"
+	"math/rand"
 	"reflect"
 	"testing"
 	"time"
@@ -74,6 +75,41 @@ func TestV3PutOverwrite(t *testing.T) {
 	}
 }
 
+// TestPutRestart checks if a put after an unrelated member restart succeeds
+func TestV3PutRestart(t *testing.T) {
+	// this test might block for 5 seconds, make it parallel to speed up the test.
+	t.Parallel()
+
+	defer testutil.AfterTest(t)
+	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
+	defer clus.Terminate(t)
+
+	kvIdx := rand.Intn(3)
+	kvc := toGRPC(clus.Client(kvIdx)).KV
+
+	stopIdx := kvIdx
+	for stopIdx == kvIdx {
+		stopIdx = rand.Intn(3)
+	}
+
+	clus.clients[stopIdx].Close()
+	clus.Members[stopIdx].Stop(t)
+	clus.Members[stopIdx].Restart(t)
+	c, cerr := NewClientV3(clus.Members[stopIdx])
+	if cerr != nil {
+		t.Fatal(cerr)
+	}
+	clus.clients[stopIdx] = c
+
+	ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second)
+	defer cancel()
+	reqput := &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}
+	_, err := kvc.Put(ctx, reqput)
+	if err != nil && err == ctx.Err() {
+		t.Fatalf("expected grpc error, got local ctx error (%v)", err)
+	}
+}
+
 // TestV3CompactCurrentRev ensures keys are present when compacting on current revision.
 func TestV3CompactCurrentRev(t *testing.T) {
 	defer testutil.AfterTest(t)