Browse Source

clientv3/concurrency: allow election on prefixes of keys.

After winning an election or obtaining a lock, we
auto-append a slash after the provided key prefix.
This avoids the previous deadlock due to waiting
on the wrong key.

Fixes #6278
Jason E. Aten 9 years ago
parent
commit
9497e9678c

+ 2 - 3
clientv3/concurrency/election.go

@@ -40,7 +40,7 @@ type Election struct {
 
 
 // NewElection returns a new election on a given key prefix.
 // NewElection returns a new election on a given key prefix.
 func NewElection(s *Session, pfx string) *Election {
 func NewElection(s *Session, pfx string) *Election {
-	return &Election{session: s, keyPrefix: pfx}
+	return &Election{session: s, keyPrefix: pfx + "/"}
 }
 }
 
 
 // Campaign puts a value as eligible for the election. It blocks until
 // Campaign puts a value as eligible for the election. It blocks until
@@ -49,7 +49,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
 	s := e.session
 	s := e.session
 	client := e.session.Client()
 	client := e.session.Client()
 
 
-	k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease())
+	k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
 	txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
 	txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
 	txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
 	txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
 	txn = txn.Else(v3.OpGet(k))
 	txn = txn.Else(v3.OpGet(k))
@@ -57,7 +57,6 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-
 	e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
 	e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
 	if !resp.Succeeded {
 	if !resp.Succeeded {
 		kv := resp.Responses[0].GetResponseRange().Kvs[0]
 		kv := resp.Responses[0].GetResponseRange().Kvs[0]

+ 2 - 2
clientv3/concurrency/mutex.go

@@ -32,7 +32,7 @@ type Mutex struct {
 }
 }
 
 
 func NewMutex(s *Session, pfx string) *Mutex {
 func NewMutex(s *Session, pfx string) *Mutex {
-	return &Mutex{s, pfx, "", -1}
+	return &Mutex{s, pfx + "/", "", -1}
 }
 }
 
 
 // Lock locks the mutex with a cancellable context. If the context is cancelled
 // Lock locks the mutex with a cancellable context. If the context is cancelled
@@ -41,7 +41,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
 	s := m.s
 	s := m.s
 	client := m.s.Client()
 	client := m.s.Client()
 
 
-	m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease())
+	m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
 	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
 	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
 	// put self in lock waiters via myKey; oldest waiter holds lock
 	// put self in lock waiters via myKey; oldest waiter holds lock
 	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
 	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))

+ 28 - 0
integration/v3_election_test.go

@@ -197,3 +197,31 @@ func TestElectionSessionRecampaign(t *testing.T) {
 		t.Fatalf("expected value=%q, got response %v", "def", resp)
 		t.Fatalf("expected value=%q, got response %v", "def", resp)
 	}
 	}
 }
 }
+
+// TestElectionOnPrefixOfExistingKey checks that a single
+// candidate can be elected on a new key that is a prefix
+// of an existing key. To wit, check for regression
+// of bug #6278. https://github.com/coreos/etcd/issues/6278
+//
+func TestElectionOnPrefixOfExistingKey(t *testing.T) {
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+
+	cli := clus.RandClient()
+	if _, err := cli.Put(context.TODO(), "testa", "value"); err != nil {
+		t.Fatal(err)
+	}
+	s, serr := concurrency.NewSession(cli)
+	if serr != nil {
+		t.Fatal(serr)
+	}
+	e := concurrency.NewElection(s, "test")
+	ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
+	err := e.Campaign(ctx, "abc")
+	cancel()
+	if err != nil {
+		// after 5 seconds, deadlock results in
+		// 'context deadline exceeded' here.
+		t.Fatal(err)
+	}
+}