浏览代码

Merge pull request #7819 from heyitsanthony/fix-elect-compact

concurrency: use current revisions for election
Anthony Romano 8 年之前
父节点
当前提交
c309d745a6
共有 2 个文件被更改,包括 50 次插入7 次删除
  1. 14 7
      clientv3/concurrency/election.go
  2. 36 0
      integration/v3_election_test.go

+ 14 - 7
clientv3/concurrency/election.go

@@ -165,15 +165,14 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
 	client := e.session.Client()
 
 	defer close(ch)
-	lastRev := int64(0)
 	for {
-		opts := append(v3.WithFirstCreate(), v3.WithRev(lastRev))
-		resp, err := client.Get(ctx, e.keyPrefix, opts...)
+		resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
 		if err != nil {
 			return
 		}
 
 		var kv *mvccpb.KeyValue
+		var hdr *pb.ResponseHeader
 
 		if len(resp.Kvs) == 0 {
 			cctx, cancel := context.WithCancel(ctx)
@@ -189,18 +188,27 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
 				// only accept PUTs; a DELETE will make observe() spin
 				for _, ev := range wr.Events {
 					if ev.Type == mvccpb.PUT {
-						kv = ev.Kv
+						hdr, kv = &wr.Header, ev.Kv
+						// may have multiple revs; hdr.rev = the last rev
+						// set to kv's rev in case batch has multiple PUTs
+						hdr.Revision = kv.ModRevision
 						break
 					}
 				}
 			}
 			cancel()
 		} else {
-			kv = resp.Kvs[0]
+			hdr, kv = resp.Header, resp.Kvs[0]
+		}
+
+		select {
+		case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:
+		case <-ctx.Done():
+			return
 		}
 
 		cctx, cancel := context.WithCancel(ctx)
-		wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision))
+		wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
 		keyDeleted := false
 		for !keyDeleted {
 			wr, ok := <-wch
@@ -209,7 +217,6 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
 			}
 			for _, ev := range wr.Events {
 				if ev.Type == mvccpb.DELETE {
-					lastRev = ev.Kv.ModRevision
 					keyDeleted = true
 					break
 				}

+ 36 - 0
integration/v3_election_test.go

@@ -272,3 +272,39 @@ func TestElectionOnSessionRestart(t *testing.T) {
 		t.Errorf("expected value=%q, got response %v", "def", resp)
 	}
 }
+
+// TestElectionObserveCompacted checks that observe can tolerate
+// a leader key with a modrev less than the compaction revision.
+func TestElectionObserveCompacted(t *testing.T) {
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	cli := clus.Client(0)
+
+	session, err := concurrency.NewSession(cli)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer session.Orphan()
+
+	e := concurrency.NewElection(session, "test-elect")
+	if cerr := e.Campaign(context.TODO(), "abc"); cerr != nil {
+		t.Fatal(cerr)
+	}
+
+	presp, perr := cli.Put(context.TODO(), "foo", "bar")
+	if perr != nil {
+		t.Fatal(perr)
+	}
+	if _, cerr := cli.Compact(context.TODO(), presp.Header.Revision); cerr != nil {
+		t.Fatal(cerr)
+	}
+
+	v, ok := <-e.Observe(context.TODO())
+	if !ok {
+		t.Fatal("failed to observe on compacted revision")
+	}
+	if string(v.Kvs[0].Value) != "abc" {
+		t.Fatalf(`expected leader value "abc", got %q`, string(v.Kvs[0].Value))
+	}
+}