Explorar el Código

concurrency: fetch current lock holder when creating waitlist key

The uncontended path for a mutex would fetch the minimum
revision key on the prefix after creating its entry in
the wait list. This fetch can be rolled into the txn for
creating the wait key, eliminating a round-trip for immediately
acquiring the lock.
Anthony Romano hace 8 años
padre
commit
f5b96991a1
Se han modificado 1 ficheros con 9 adiciones y 1 borrados
  1. 9 1
      clientv3/concurrency/mutex.go

+ 9 - 1
clientv3/concurrency/mutex.go

@@ -49,7 +49,9 @@ func (m *Mutex) Lock(ctx context.Context) error {
 	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
 	// reuse key in case this session already holds the lock
 	get := v3.OpGet(m.myKey)
-	resp, err := client.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
+	// fetch current holder to complete uncontended path with only one RPC
+	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
+	resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
 	if err != nil {
 		return err
 	}
@@ -57,6 +59,12 @@ func (m *Mutex) Lock(ctx context.Context) error {
 	if !resp.Succeeded {
 		m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
 	}
+	// if no key on prefix / the minimum rev is key, already hold the lock
+	ownerKey := resp.Responses[1].GetResponseRange().Kvs
+	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
+		m.hdr = resp.Header
+		return nil
+	}
 
 	// wait for deletion revisions prior to myKey
 	hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)