|
|
@@ -23,6 +23,8 @@ import (
|
|
|
"github.com/coreos/etcd/clientv3"
|
|
|
"github.com/coreos/etcd/clientv3/concurrency"
|
|
|
"github.com/coreos/etcd/contrib/recipes"
|
|
|
+ "github.com/coreos/etcd/mvcc/mvccpb"
|
|
|
+ "github.com/coreos/etcd/pkg/testutil"
|
|
|
"golang.org/x/net/context"
|
|
|
)
|
|
|
|
|
|
@@ -101,6 +103,117 @@ func TestMutexSessionRelock(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// TestMutexWaitsOnCurrentHolder ensures a mutex is only acquired once all
|
|
|
+// waiters older than the new owner are gone by testing the case where
|
|
|
+// the waiter prior to the acquirer expires before the current holder.
|
|
|
+func TestMutexWaitsOnCurrentHolder(t *testing.T) {
|
|
|
+ defer testutil.AfterTest(t)
|
|
|
+
|
|
|
+ clus := NewClusterV3(t, &ClusterConfig{Size: 1})
|
|
|
+ defer clus.Terminate(t)
|
|
|
+
|
|
|
+ cctx := context.Background()
|
|
|
+
|
|
|
+ cli := clus.Client(0)
|
|
|
+
|
|
|
+ firstOwnerSession, err := concurrency.NewSession(cli)
|
|
|
+ if err != nil {
|
|
|
+ t.Error(err)
|
|
|
+ }
|
|
|
+ defer firstOwnerSession.Close()
|
|
|
+ firstOwnerMutex := concurrency.NewMutex(firstOwnerSession, "test-mutex")
|
|
|
+ if err = firstOwnerMutex.Lock(cctx); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ victimSession, err := concurrency.NewSession(cli)
|
|
|
+ if err != nil {
|
|
|
+ t.Error(err)
|
|
|
+ }
|
|
|
+ defer victimSession.Close()
|
|
|
+ victimDonec := make(chan struct{})
|
|
|
+ go func() {
|
|
|
+ defer close(victimDonec)
|
|
|
+ concurrency.NewMutex(victimSession, "test-mutex").Lock(cctx)
|
|
|
+ }()
|
|
|
+
|
|
|
+ // ensure mutexes associated with firstOwnerSession and victimSession waits before new owner
|
|
|
+ wch := cli.Watch(cctx, "test-mutex", clientv3.WithPrefix(), clientv3.WithRev(1))
|
|
|
+ putCounts := 0
|
|
|
+ for putCounts < 2 {
|
|
|
+ select {
|
|
|
+ case wrp := <-wch:
|
|
|
+ putCounts += len(wrp.Events)
|
|
|
+ case <-time.After(time.Second):
|
|
|
+ t.Fatal("failed to receive watch response")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if putCounts != 2 {
|
|
|
+ t.Fatalf("expect 2 put events, but got %v", putCounts)
|
|
|
+ }
|
|
|
+
|
|
|
+ newOwnerSession, err := concurrency.NewSession(cli)
|
|
|
+ if err != nil {
|
|
|
+ t.Error(err)
|
|
|
+ }
|
|
|
+ defer newOwnerSession.Close()
|
|
|
+ newOwnerDonec := make(chan struct{})
|
|
|
+ go func() {
|
|
|
+ defer close(newOwnerDonec)
|
|
|
+ concurrency.NewMutex(newOwnerSession, "test-mutex").Lock(cctx)
|
|
|
+ }()
|
|
|
+
|
|
|
+ select {
|
|
|
+ case wrp := <-wch:
|
|
|
+ if len(wrp.Events) != 1 {
|
|
|
+ t.Fatalf("expect a event, but got %v events", len(wrp.Events))
|
|
|
+ }
|
|
|
+ if e := wrp.Events[0]; e.Type != mvccpb.PUT {
|
|
|
+ t.Fatalf("expect a put event on prefix test-mutex, but got event type %v", e.Type)
|
|
|
+ }
|
|
|
+ case <-time.After(time.Second):
|
|
|
+ t.Fatalf("failed to receive a watch response")
|
|
|
+ }
|
|
|
+
|
|
|
+ // simulate losing the client that's next in line to acquire the lock
|
|
|
+ victimSession.Close()
|
|
|
+
|
|
|
+ // ensures the deletion of victim waiter from server side.
|
|
|
+ select {
|
|
|
+ case wrp := <-wch:
|
|
|
+ if len(wrp.Events) != 1 {
|
|
|
+ t.Fatalf("expect a event, but got %v events", len(wrp.Events))
|
|
|
+ }
|
|
|
+ if e := wrp.Events[0]; e.Type != mvccpb.DELETE {
|
|
|
+ t.Fatalf("expect a delete event on prefix test-mutex, but got event type %v", e.Type)
|
|
|
+ }
|
|
|
+ case <-time.After(time.Second):
|
|
|
+ t.Fatal("failed to receive a watch response")
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-newOwnerDonec:
|
|
|
+ t.Fatal("new owner obtained lock before first owner unlocked")
|
|
|
+ default:
|
|
|
+ }
|
|
|
+
|
|
|
+ if err := firstOwnerMutex.Unlock(cctx); err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-newOwnerDonec:
|
|
|
+ case <-time.After(time.Second):
|
|
|
+ t.Fatal("new owner failed to obtain lock")
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case <-victimDonec:
|
|
|
+ case <-time.After(time.Second):
|
|
|
+ t.Fatal("victim mutex failed to exit after first owner releases lock")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func BenchmarkMutex4Waiters(b *testing.B) {
|
|
|
// XXX switch tests to use TB interface
|
|
|
clus := NewClusterV3(nil, &ClusterConfig{Size: 3})
|