Browse Source

concurrency: use create max revision for locks and elections

Anthony Romano 9 years ago
parent
commit
9b1fe45853
3 changed files with 14 additions and 27 deletions
  1. 1 1
      clientv3/concurrency/election.go
  2. 12 25
      clientv3/concurrency/key.go
  3. 1 1
      clientv3/concurrency/mutex.go

+ 1 - 1
clientv3/concurrency/election.go

@@ -69,7 +69,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
 		}
 	}
 
-	err = waitDeletes(ctx, client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1))
+	err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
 	if err != nil {
 		// clean up in case of context cancel
 		select {

+ 12 - 25
clientv3/concurrency/key.go

@@ -16,7 +16,6 @@ package concurrency
 
 import (
 	"fmt"
-	"math"
 
 	v3 "github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/mvcc/mvccpb"
@@ -39,39 +38,27 @@ func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) e
 	if err := wr.Err(); err != nil {
 		return err
 	}
-
 	if err := ctx.Err(); err != nil {
 		return err
 	}
 	return fmt.Errorf("lost watcher waiting for delete")
 }
 
-// waitDeletes efficiently waits until all keys matched by Get(key, opts...) are deleted
-func waitDeletes(ctx context.Context, client *v3.Client, key string, opts ...v3.OpOption) error {
-	getOpts := []v3.OpOption{v3.WithSort(v3.SortByCreateRevision, v3.SortAscend)}
-	getOpts = append(getOpts, opts...)
-	resp, err := client.Get(ctx, key, getOpts...)
-	maxRev := int64(math.MaxInt64)
-	getOpts = append(getOpts, v3.WithRev(0))
-	for err == nil {
-		for len(resp.Kvs) > 0 {
-			i := len(resp.Kvs) - 1
-			if resp.Kvs[i].CreateRevision <= maxRev {
-				break
-			}
-			resp.Kvs = resp.Kvs[:i]
+// waitDeletes efficiently waits until all keys matching the prefix and no greater
+// than the create revision.
+func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) error {
+	getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
+	for {
+		resp, err := client.Get(ctx, pfx, getOpts...)
+		if err != nil {
+			return err
 		}
 		if len(resp.Kvs) == 0 {
-			break
+			return nil
 		}
-		lastKV := resp.Kvs[len(resp.Kvs)-1]
-		maxRev = lastKV.CreateRevision
-		err = waitDelete(ctx, client, string(lastKV.Key), maxRev)
-		if err != nil || len(resp.Kvs) == 1 {
-			break
+		lastKey := string(resp.Kvs[0].Key)
+		if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
+			return err
 		}
-		getOpts = append(getOpts, v3.WithLimit(int64(len(resp.Kvs)-1)))
-		resp, err = client.Get(ctx, key, getOpts...)
 	}
-	return err
 }

+ 1 - 1
clientv3/concurrency/mutex.go

@@ -57,7 +57,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
 	}
 
 	// wait for deletion revisions prior to myKey
-	err = waitDeletes(ctx, client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1))
+	err = waitDeletes(ctx, client, m.pfx, m.myRev-1)
 	// release lock key if cancelled
 	select {
 	case <-ctx.Done():