Browse Source

Merge pull request #6423 from heyitsanthony/fix-rwmutex

recipes: fix rwmutex locking
Anthony Romano 9 years ago
parent
commit
c74ac99871
2 changed files with 27 additions and 45 deletions
  1. 26 44
      contrib/recipes/rwmutex.go
  2. 1 1
      integration/v3_lock_test.go

+ 26 - 44
contrib/recipes/rwmutex.go

@@ -25,81 +25,63 @@ type RWMutex struct {
 	s   *concurrency.Session
 	s   *concurrency.Session
 	ctx context.Context
 	ctx context.Context
 
 
-	key   string
+	pfx   string
 	myKey *EphemeralKV
 	myKey *EphemeralKV
 }
 }
 
 
-func NewRWMutex(s *concurrency.Session, key string) *RWMutex {
-	return &RWMutex{s, context.TODO(), key, nil}
+func NewRWMutex(s *concurrency.Session, prefix string) *RWMutex {
+	return &RWMutex{s, context.TODO(), prefix + "/", nil}
 }
 }
 
 
 func (rwm *RWMutex) RLock() error {
 func (rwm *RWMutex) RLock() error {
-	client := rwm.s.Client()
-
-	rk, err := NewUniqueEphemeralKey(rwm.s, rwm.key+"/read")
+	rk, err := NewUniqueEphemeralKey(rwm.s, rwm.pfx+"read")
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 	rwm.myKey = rk
 	rwm.myKey = rk
-
-	// if there are nodes with "write-" and a lower
-	// revision number than us we must wait
-	resp, err := client.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...)
-	if err != nil {
-		return err
-	}
-	if len(resp.Kvs) == 0 || resp.Kvs[0].ModRevision > rk.Revision() {
-		// no blocking since no write key
-		return nil
+	// wait until nodes with "write-" and a lower revision number than myKey are gone
+	for {
+		if done, werr := rwm.waitOnLastRev(rwm.pfx + "write"); done || werr != nil {
+			return werr
+		}
 	}
 	}
-	return rwm.waitOnLowest()
 }
 }
 
 
 func (rwm *RWMutex) Lock() error {
 func (rwm *RWMutex) Lock() error {
-	client := rwm.s.Client()
-
-	rk, err := NewUniqueEphemeralKey(rwm.s, rwm.key+"/write")
+	rk, err := NewUniqueEphemeralKey(rwm.s, rwm.pfx+"write")
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 	rwm.myKey = rk
 	rwm.myKey = rk
-
+	// wait until all keys of lower revision than myKey are gone
 	for {
 	for {
-		// find any key of lower rev number blocks the write lock
-		opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1))
-		resp, err := client.Get(rwm.ctx, rwm.key, opts...)
-		if err != nil {
-			return err
-		}
-		if len(resp.Kvs) == 0 {
-			// no matching for revision before myKey; acquired
-			break
-		}
-		if err := rwm.waitOnLowest(); err != nil {
-			return err
+		if done, werr := rwm.waitOnLastRev(rwm.pfx); done || werr != nil {
+			return werr
 		}
 		}
-		//  get the new lowest, etc until this is the only one left
+		//  get the new lowest key until this is the only one left
 	}
 	}
-
-	return nil
 }
 }
 
 
-func (rwm *RWMutex) waitOnLowest() error {
+// waitOnLowest will wait on the last key with a revision < rwm.myKey.Revision with a
+// given prefix. If there are no keys left to wait on, return true.
+func (rwm *RWMutex) waitOnLastRev(pfx string) (bool, error) {
 	client := rwm.s.Client()
 	client := rwm.s.Client()
-
-	// must block; get key before ek for waiting
-	opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1))
-	lastKey, err := client.Get(rwm.ctx, rwm.key, opts...)
+	// get key that's blocking myKey
+	opts := append(v3.WithLastRev(), v3.WithMaxModRev(rwm.myKey.Revision()-1))
+	lastKey, err := client.Get(rwm.ctx, pfx, opts...)
 	if err != nil {
 	if err != nil {
-		return err
+		return false, err
+	}
+	if len(lastKey.Kvs) == 0 {
+		return true, nil
 	}
 	}
-	// wait for release on prior key
+	// wait for release on blocking key
 	_, err = WaitEvents(
 	_, err = WaitEvents(
 		client,
 		client,
 		string(lastKey.Kvs[0].Key),
 		string(lastKey.Kvs[0].Key),
 		rwm.myKey.Revision(),
 		rwm.myKey.Revision(),
 		[]mvccpb.Event_EventType{mvccpb.DELETE})
 		[]mvccpb.Event_EventType{mvccpb.DELETE})
-	return err
+	return false, err
 }
 }
 
 
 func (rwm *RWMutex) RUnlock() error { return rwm.myKey.Delete() }
 func (rwm *RWMutex) RUnlock() error { return rwm.myKey.Delete() }

+ 1 - 1
integration/v3_lock_test.go

@@ -133,7 +133,7 @@ func testRWMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client
 				t.Error(err)
 				t.Error(err)
 			}
 			}
 			rwm := recipe.NewRWMutex(session, "test-rwmutex")
 			rwm := recipe.NewRWMutex(session, "test-rwmutex")
-			if rand.Intn(1) == 0 {
+			if rand.Intn(2) == 0 {
 				if err := rwm.RLock(); err != nil {
 				if err := rwm.RLock(); err != nil {
 					t.Fatalf("could not rlock (%v)", err)
 					t.Fatalf("could not rlock (%v)", err)
 				}
 				}