浏览代码

clientv3/concurrency: use session id for election keys to avoid deadlock

Anthony Romano 9 年之前
父节点
当前提交
120020fa9c
共有 3 个文件被更改,包括 46 次插入27 次删除
  1. 23 5
      clientv3/concurrency/election.go
  2. 0 22
      clientv3/concurrency/key.go
  3. 23 0
      integration/v3_election_test.go

+ 23 - 5
clientv3/concurrency/election.go

@@ -16,6 +16,7 @@ package concurrency
 
 
 import (
 import (
 	"errors"
 	"errors"
+	"fmt"
 
 
 	v3 "github.com/coreos/etcd/clientv3"
 	v3 "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"github.com/coreos/etcd/mvcc/mvccpb"
@@ -50,22 +51,39 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
 		return serr
 		return serr
 	}
 	}
 
 
-	k, rev, err := NewUniqueKV(ctx, e.client, e.keyPrefix, val, v3.WithLease(s.Lease()))
-	if err == nil {
-		err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(rev-1))
+	k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease())
+	txn := e.client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
+	txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
+	txn = txn.Else(v3.OpGet(k))
+	resp, err := txn.Commit()
+	if err != nil {
+		return err
+	}
+
+	e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
+	if !resp.Succeeded {
+		kv := resp.Responses[0].GetResponseRange().Kvs[0]
+		e.leaderRev = kv.CreateRevision
+		if string(kv.Value) != val {
+			if err = e.Proclaim(ctx, val); err != nil {
+				e.Resign(ctx)
+				return err
+			}
+		}
 	}
 	}
 
 
+	err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1))
 	if err != nil {
 	if err != nil {
 		// clean up in case of context cancel
 		// clean up in case of context cancel
 		select {
 		select {
 		case <-ctx.Done():
 		case <-ctx.Done():
-			e.client.Delete(e.client.Ctx(), k)
+			e.Resign(e.client.Ctx())
 		default:
 		default:
+			e.leaderSession = nil
 		}
 		}
 		return err
 		return err
 	}
 	}
 
 
-	e.leaderKey, e.leaderRev, e.leaderSession = k, rev, s
 	return nil
 	return nil
 }
 }
 
 

+ 0 - 22
clientv3/concurrency/key.go

@@ -17,34 +17,12 @@ package concurrency
 import (
 import (
 	"fmt"
 	"fmt"
 	"math"
 	"math"
-	"time"
 
 
 	v3 "github.com/coreos/etcd/clientv3"
 	v3 "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
 )
 )
 
 
-// NewUniqueKey creates a new key from a given prefix.
-func NewUniqueKey(ctx context.Context, kv v3.KV, pfx string, opts ...v3.OpOption) (string, int64, error) {
-	return NewUniqueKV(ctx, kv, pfx, "", opts...)
-}
-
-func NewUniqueKV(ctx context.Context, kv v3.KV, pfx, val string, opts ...v3.OpOption) (string, int64, error) {
-	for {
-		newKey := fmt.Sprintf("%s/%v", pfx, time.Now().UnixNano())
-		put := v3.OpPut(newKey, val, opts...)
-		cmp := v3.Compare(v3.ModRevision(newKey), "=", 0)
-		resp, err := kv.Txn(ctx).If(cmp).Then(put).Commit()
-		if err != nil {
-			return "", 0, err
-		}
-		if !resp.Succeeded {
-			continue
-		}
-		return newKey, resp.Header.Revision, nil
-	}
-}
-
 func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
 func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
 	cctx, cancel := context.WithCancel(ctx)
 	cctx, cancel := context.WithCancel(ctx)
 	defer cancel()
 	defer cancel()

+ 23 - 0
integration/v3_election_test.go

@@ -146,3 +146,26 @@ func TestElectionFailover(t *testing.T) {
 	// leader must ack election (otherwise, Campaign may see closed conn)
 	// leader must ack election (otherwise, Campaign may see closed conn)
 	<-electedc
 	<-electedc
 }
 }
+
+// TestElectionSessionRelock ensures that campaigning twice on the same election
+// with the same lock will Proclaim instead of deadlocking.
+func TestElectionSessionRecampaign(t *testing.T) {
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+	cli := clus.RandClient()
+
+	e := concurrency.NewElection(cli, "test-elect")
+	if err := e.Campaign(context.TODO(), "abc"); err != nil {
+		t.Fatal(err)
+	}
+	e2 := concurrency.NewElection(cli, "test-elect")
+	if err := e2.Campaign(context.TODO(), "def"); err != nil {
+		t.Fatal(err)
+	}
+
+	ctx, cancel := context.WithCancel(context.TODO())
+	defer cancel()
+	if resp := <-e.Observe(ctx); len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != "def" {
+		t.Fatalf("expected value=%q, got response %v", "def", resp)
+	}
+}