Browse Source

recipes: fix rwmutex so locking works

Fixes #6408
Anthony Romano 9 years ago
parent
commit
9f829fdab7
1 changed files with 26 additions and 44 deletions
  1. 26 44
      contrib/recipes/rwmutex.go

+ 26 - 44
contrib/recipes/rwmutex.go

@@ -25,81 +25,63 @@ type RWMutex struct {
 	s   *concurrency.Session
 	ctx context.Context
 
-	key   string
+	pfx   string
 	myKey *EphemeralKV
 }
 
-func NewRWMutex(s *concurrency.Session, key string) *RWMutex {
-	return &RWMutex{s, context.TODO(), key, nil}
+func NewRWMutex(s *concurrency.Session, prefix string) *RWMutex {
+	return &RWMutex{s, context.TODO(), prefix + "/", nil}
 }
 
 func (rwm *RWMutex) RLock() error {
-	client := rwm.s.Client()
-
-	rk, err := NewUniqueEphemeralKey(rwm.s, rwm.key+"/read")
+	rk, err := NewUniqueEphemeralKey(rwm.s, rwm.pfx+"read")
 	if err != nil {
 		return err
 	}
 	rwm.myKey = rk
-
-	// if there are nodes with "write-" and a lower
-	// revision number than us we must wait
-	resp, err := client.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...)
-	if err != nil {
-		return err
-	}
-	if len(resp.Kvs) == 0 || resp.Kvs[0].ModRevision > rk.Revision() {
-		// no blocking since no write key
-		return nil
+	// wait until nodes with "write-" and a lower revision number than myKey are gone
+	for {
+		if done, werr := rwm.waitOnLastRev(rwm.pfx + "write"); done || werr != nil {
+			return werr
+		}
 	}
-	return rwm.waitOnLowest()
 }
 
 func (rwm *RWMutex) Lock() error {
-	client := rwm.s.Client()
-
-	rk, err := NewUniqueEphemeralKey(rwm.s, rwm.key+"/write")
+	rk, err := NewUniqueEphemeralKey(rwm.s, rwm.pfx+"write")
 	if err != nil {
 		return err
 	}
 	rwm.myKey = rk
-
+	// wait until all keys of lower revision than myKey are gone
 	for {
-		// find any key of lower rev number blocks the write lock
-		opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1))
-		resp, err := client.Get(rwm.ctx, rwm.key, opts...)
-		if err != nil {
-			return err
-		}
-		if len(resp.Kvs) == 0 {
-			// no matching for revision before myKey; acquired
-			break
-		}
-		if err := rwm.waitOnLowest(); err != nil {
-			return err
+		if done, werr := rwm.waitOnLastRev(rwm.pfx); done || werr != nil {
+			return werr
 		}
-		//  get the new lowest, etc until this is the only one left
+		//  get the new lowest key until this is the only one left
 	}
-
-	return nil
 }
 
-func (rwm *RWMutex) waitOnLowest() error {
+// waitOnLowest will wait on the last key with a revision < rwm.myKey.Revision with a
+// given prefix. If there are no keys left to wait on, return true.
+func (rwm *RWMutex) waitOnLastRev(pfx string) (bool, error) {
 	client := rwm.s.Client()
-
-	// must block; get key before ek for waiting
-	opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1))
-	lastKey, err := client.Get(rwm.ctx, rwm.key, opts...)
+	// get key that's blocking myKey
+	opts := append(v3.WithLastRev(), v3.WithMaxModRev(rwm.myKey.Revision()-1))
+	lastKey, err := client.Get(rwm.ctx, pfx, opts...)
 	if err != nil {
-		return err
+		return false, err
+	}
+	if len(lastKey.Kvs) == 0 {
+		return true, nil
 	}
-	// wait for release on prior key
+	// wait for release on blocking key
 	_, err = WaitEvents(
 		client,
 		string(lastKey.Kvs[0].Key),
 		rwm.myKey.Revision(),
 		[]mvccpb.Event_EventType{mvccpb.DELETE})
-	return err
+	return false, err
 }
 
 func (rwm *RWMutex) RUnlock() error { return rwm.myKey.Delete() }