Przeglądaj źródła

session: remove session manager and add ttl

Xiang Li 9 lat temu
rodzic
commit
feaff17259

+ 19 - 16
clientv3/concurrency/election.go

@@ -29,7 +29,7 @@ var (
 )
 )
 
 
 type Election struct {
 type Election struct {
-	client *v3.Client
+	session *Session
 
 
 	keyPrefix string
 	keyPrefix string
 
 
@@ -39,20 +39,18 @@ type Election struct {
 }
 }
 
 
 // NewElection returns a new election on a given key prefix.
 // NewElection returns a new election on a given key prefix.
-func NewElection(client *v3.Client, pfx string) *Election {
-	return &Election{client: client, keyPrefix: pfx}
+func NewElection(s *Session, pfx string) *Election {
+	return &Election{session: s, keyPrefix: pfx}
 }
 }
 
 
 // Campaign puts a value as eligible for the election. It blocks until
 // Campaign puts a value as eligible for the election. It blocks until
 // it is elected, an error occurs, or the context is cancelled.
 // it is elected, an error occurs, or the context is cancelled.
 func (e *Election) Campaign(ctx context.Context, val string) error {
 func (e *Election) Campaign(ctx context.Context, val string) error {
-	s, serr := NewSession(e.client)
-	if serr != nil {
-		return serr
-	}
+	s := e.session
+	client := e.session.Client()
 
 
 	k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease())
 	k := fmt.Sprintf("%s/%x", e.keyPrefix, s.Lease())
-	txn := e.client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
+	txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
 	txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
 	txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
 	txn = txn.Else(v3.OpGet(k))
 	txn = txn.Else(v3.OpGet(k))
 	resp, err := txn.Commit()
 	resp, err := txn.Commit()
@@ -72,12 +70,12 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
 		}
 		}
 	}
 	}
 
 
-	err = waitDeletes(ctx, e.client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1))
+	err = waitDeletes(ctx, client, e.keyPrefix, v3.WithPrefix(), v3.WithRev(e.leaderRev-1))
 	if err != nil {
 	if err != nil {
 		// clean up in case of context cancel
 		// clean up in case of context cancel
 		select {
 		select {
 		case <-ctx.Done():
 		case <-ctx.Done():
-			e.Resign(e.client.Ctx())
+			e.Resign(client.Ctx())
 		default:
 		default:
 			e.leaderSession = nil
 			e.leaderSession = nil
 		}
 		}
@@ -92,8 +90,9 @@ func (e *Election) Proclaim(ctx context.Context, val string) error {
 	if e.leaderSession == nil {
 	if e.leaderSession == nil {
 		return ErrElectionNotLeader
 		return ErrElectionNotLeader
 	}
 	}
+	client := e.session.Client()
 	cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
 	cmp := v3.Compare(v3.CreateRevision(e.leaderKey), "=", e.leaderRev)
-	txn := e.client.Txn(ctx).If(cmp)
+	txn := client.Txn(ctx).If(cmp)
 	txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
 	txn = txn.Then(v3.OpPut(e.leaderKey, val, v3.WithLease(e.leaderSession.Lease())))
 	tresp, terr := txn.Commit()
 	tresp, terr := txn.Commit()
 	if terr != nil {
 	if terr != nil {
@@ -111,7 +110,8 @@ func (e *Election) Resign(ctx context.Context) (err error) {
 	if e.leaderSession == nil {
 	if e.leaderSession == nil {
 		return nil
 		return nil
 	}
 	}
-	_, err = e.client.Delete(ctx, e.leaderKey)
+	client := e.session.Client()
+	_, err = client.Delete(ctx, e.leaderKey)
 	e.leaderKey = ""
 	e.leaderKey = ""
 	e.leaderSession = nil
 	e.leaderSession = nil
 	return err
 	return err
@@ -119,7 +119,8 @@ func (e *Election) Resign(ctx context.Context) (err error) {
 
 
 // Leader returns the leader value for the current election.
 // Leader returns the leader value for the current election.
 func (e *Election) Leader(ctx context.Context) (string, error) {
 func (e *Election) Leader(ctx context.Context) (string, error) {
-	resp, err := e.client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
+	client := e.session.Client()
+	resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
 	if err != nil {
 	if err != nil {
 		return "", err
 		return "", err
 	} else if len(resp.Kvs) == 0 {
 	} else if len(resp.Kvs) == 0 {
@@ -139,9 +140,11 @@ func (e *Election) Observe(ctx context.Context) <-chan v3.GetResponse {
 }
 }
 
 
 func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
 func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
+	client := e.session.Client()
+
 	defer close(ch)
 	defer close(ch)
 	for {
 	for {
-		resp, err := e.client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
+		resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)
 		if err != nil {
 		if err != nil {
 			return
 			return
 		}
 		}
@@ -152,7 +155,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
 		if len(resp.Kvs) == 0 {
 		if len(resp.Kvs) == 0 {
 			// wait for first key put on prefix
 			// wait for first key put on prefix
 			opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
 			opts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}
-			wch := e.client.Watch(cctx, e.keyPrefix, opts...)
+			wch := client.Watch(cctx, e.keyPrefix, opts...)
 
 
 			for kv == nil {
 			for kv == nil {
 				wr, ok := <-wch
 				wr, ok := <-wch
@@ -172,7 +175,7 @@ func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {
 			kv = resp.Kvs[0]
 			kv = resp.Kvs[0]
 		}
 		}
 
 
-		wch := e.client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision))
+		wch := client.Watch(cctx, string(kv.Key), v3.WithRev(kv.ModRevision))
 		keyDeleted := false
 		keyDeleted := false
 		for !keyDeleted {
 		for !keyDeleted {
 			wr, ok := <-wch
 			wr, ok := <-wch

+ 16 - 15
clientv3/concurrency/mutex.go

@@ -24,24 +24,22 @@ import (
 
 
 // Mutex implements the sync Locker interface with etcd
 // Mutex implements the sync Locker interface with etcd
 type Mutex struct {
 type Mutex struct {
-	client *v3.Client
+	s *Session
 
 
 	pfx   string
 	pfx   string
 	myKey string
 	myKey string
 	myRev int64
 	myRev int64
 }
 }
 
 
-func NewMutex(client *v3.Client, pfx string) *Mutex {
-	return &Mutex{client, pfx, "", -1}
+func NewMutex(s *Session, pfx string) *Mutex {
+	return &Mutex{s, pfx, "", -1}
 }
 }
 
 
 // Lock locks the mutex with a cancellable context. If the context is cancelled
 // Lock locks the mutex with a cancellable context. If the context is cancelled
 // while trying to acquire the lock, the mutex tries to clean its stale lock entry.
 // while trying to acquire the lock, the mutex tries to clean its stale lock entry.
 func (m *Mutex) Lock(ctx context.Context) error {
 func (m *Mutex) Lock(ctx context.Context) error {
-	s, serr := NewSession(m.client)
-	if serr != nil {
-		return serr
-	}
+	s := m.s
+	client := m.s.Client()
 
 
 	m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease())
 	m.myKey = fmt.Sprintf("%s/%x", m.pfx, s.Lease())
 	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
 	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
@@ -49,7 +47,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
 	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
 	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
 	// reuse key in case this session already holds the lock
 	// reuse key in case this session already holds the lock
 	get := v3.OpGet(m.myKey)
 	get := v3.OpGet(m.myKey)
-	resp, err := m.client.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
+	resp, err := client.Txn(ctx).If(cmp).Then(put).Else(get).Commit()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -59,18 +57,19 @@ func (m *Mutex) Lock(ctx context.Context) error {
 	}
 	}
 
 
 	// wait for deletion revisions prior to myKey
 	// wait for deletion revisions prior to myKey
-	err = waitDeletes(ctx, m.client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1))
+	err = waitDeletes(ctx, client, m.pfx, v3.WithPrefix(), v3.WithRev(m.myRev-1))
 	// release lock key if cancelled
 	// release lock key if cancelled
 	select {
 	select {
 	case <-ctx.Done():
 	case <-ctx.Done():
-		m.Unlock(m.client.Ctx())
+		m.Unlock(client.Ctx())
 	default:
 	default:
 	}
 	}
 	return err
 	return err
 }
 }
 
 
 func (m *Mutex) Unlock(ctx context.Context) error {
 func (m *Mutex) Unlock(ctx context.Context) error {
-	if _, err := m.client.Delete(ctx, m.myKey); err != nil {
+	client := m.s.Client()
+	if _, err := client.Delete(ctx, m.myKey); err != nil {
 		return err
 		return err
 	}
 	}
 	m.myKey = "\x00"
 	m.myKey = "\x00"
@@ -87,17 +86,19 @@ func (m *Mutex) Key() string { return m.myKey }
 type lockerMutex struct{ *Mutex }
 type lockerMutex struct{ *Mutex }
 
 
 func (lm *lockerMutex) Lock() {
 func (lm *lockerMutex) Lock() {
-	if err := lm.Mutex.Lock(lm.client.Ctx()); err != nil {
+	client := lm.s.Client()
+	if err := lm.Mutex.Lock(client.Ctx()); err != nil {
 		panic(err)
 		panic(err)
 	}
 	}
 }
 }
 func (lm *lockerMutex) Unlock() {
 func (lm *lockerMutex) Unlock() {
-	if err := lm.Mutex.Unlock(lm.client.Ctx()); err != nil {
+	client := lm.s.Client()
+	if err := lm.Mutex.Unlock(client.Ctx()); err != nil {
 		panic(err)
 		panic(err)
 	}
 	}
 }
 }
 
 
 // NewLocker creates a sync.Locker backed by an etcd mutex.
 // NewLocker creates a sync.Locker backed by an etcd mutex.
-func NewLocker(client *v3.Client, pfx string) sync.Locker {
-	return &lockerMutex{NewMutex(client, pfx)}
+func NewLocker(s *Session, pfx string) sync.Locker {
+	return &lockerMutex{NewMutex(s, pfx)}
 }
 }

+ 29 - 24
clientv3/concurrency/session.go

@@ -15,21 +15,11 @@
 package concurrency
 package concurrency
 
 
 import (
 import (
-	"sync"
-
 	v3 "github.com/coreos/etcd/clientv3"
 	v3 "github.com/coreos/etcd/clientv3"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
 )
 )
 
 
-// only keep one ephemeral lease per client
-var clientSessions clientSessionMgr = clientSessionMgr{sessions: make(map[*v3.Client]*Session)}
-
-const sessionTTL = 60
-
-type clientSessionMgr struct {
-	sessions map[*v3.Client]*Session
-	mu       sync.Mutex
-}
+const defaultSessionTTL = 60
 
 
 // Session represents a lease kept alive for the lifetime of a client.
 // Session represents a lease kept alive for the lifetime of a client.
 // Fault-tolerant applications may use sessions to reason about liveness.
 // Fault-tolerant applications may use sessions to reason about liveness.
@@ -42,14 +32,13 @@ type Session struct {
 }
 }
 
 
 // NewSession gets the leased session for a client.
 // NewSession gets the leased session for a client.
-func NewSession(client *v3.Client) (*Session, error) {
-	clientSessions.mu.Lock()
-	defer clientSessions.mu.Unlock()
-	if s, ok := clientSessions.sessions[client]; ok {
-		return s, nil
+func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
+	ops := &sessionOptions{ttl: defaultSessionTTL}
+	for _, opt := range opts {
+		opt(ops)
 	}
 	}
 
 
-	resp, err := client.Grant(client.Ctx(), sessionTTL)
+	resp, err := client.Grant(client.Ctx(), int64(ops.ttl))
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -63,16 +52,10 @@ func NewSession(client *v3.Client) (*Session, error) {
 
 
 	donec := make(chan struct{})
 	donec := make(chan struct{})
 	s := &Session{client: client, id: id, cancel: cancel, donec: donec}
 	s := &Session{client: client, id: id, cancel: cancel, donec: donec}
-	clientSessions.sessions[client] = s
 
 
 	// keep the lease alive until client error or cancelled context
 	// keep the lease alive until client error or cancelled context
 	go func() {
 	go func() {
-		defer func() {
-			clientSessions.mu.Lock()
-			delete(clientSessions.sessions, client)
-			clientSessions.mu.Unlock()
-			close(donec)
-		}()
+		defer close(donec)
 		for range keepAlive {
 		for range keepAlive {
 			// eat messages until keep alive channel closes
 			// eat messages until keep alive channel closes
 		}
 		}
@@ -81,6 +64,11 @@ func NewSession(client *v3.Client) (*Session, error) {
 	return s, nil
 	return s, nil
 }
 }
 
 
+// Client is the etcd client that is attached to the session.
+func (s *Session) Client() *v3.Client {
+	return s.client
+}
+
 // Lease is the lease ID for keys bound to the session.
 // Lease is the lease ID for keys bound to the session.
 func (s *Session) Lease() v3.LeaseID { return s.id }
 func (s *Session) Lease() v3.LeaseID { return s.id }
 
 
@@ -102,3 +90,20 @@ func (s *Session) Close() error {
 	_, err := s.client.Revoke(s.client.Ctx(), s.id)
 	_, err := s.client.Revoke(s.client.Ctx(), s.id)
 	return err
 	return err
 }
 }
+
+type sessionOptions struct {
+	ttl int
+}
+
+// SessionOption configures Session.
+type SessionOption func(*sessionOptions)
+
+// WithTTL configures the session's TTL in seconds.
+// If TTL is <= 0, the default 60 seconds TTL will be used.
+func WithTTL(ttl int) SessionOption {
+	return func(so *sessionOptions) {
+		if ttl > 0 {
+			so.ttl = ttl
+		}
+	}
+}

+ 18 - 15
contrib/recipes/double_barrier.go

@@ -16,6 +16,7 @@ package recipe
 
 
 import (
 import (
 	"github.com/coreos/etcd/clientv3"
 	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/clientv3/concurrency"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
 )
 )
@@ -23,32 +24,33 @@ import (
 // DoubleBarrier blocks processes on Enter until an expected count enters, then
 // DoubleBarrier blocks processes on Enter until an expected count enters, then
 // blocks again on Leave until all processes have left.
 // blocks again on Leave until all processes have left.
 type DoubleBarrier struct {
 type DoubleBarrier struct {
-	client *clientv3.Client
-	ctx    context.Context
+	s   *concurrency.Session
+	ctx context.Context
 
 
 	key   string // key for the collective barrier
 	key   string // key for the collective barrier
 	count int
 	count int
 	myKey *EphemeralKV // current key for this process on the barrier
 	myKey *EphemeralKV // current key for this process on the barrier
 }
 }
 
 
-func NewDoubleBarrier(client *clientv3.Client, key string, count int) *DoubleBarrier {
+func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarrier {
 	return &DoubleBarrier{
 	return &DoubleBarrier{
-		client: client,
-		ctx:    context.TODO(),
-		key:    key,
-		count:  count,
+		s:     s,
+		ctx:   context.TODO(),
+		key:   key,
+		count: count,
 	}
 	}
 }
 }
 
 
 // Enter waits for "count" processes to enter the barrier then returns
 // Enter waits for "count" processes to enter the barrier then returns
 func (b *DoubleBarrier) Enter() error {
 func (b *DoubleBarrier) Enter() error {
-	ek, err := NewUniqueEphemeralKey(b.client, b.key+"/waiters")
+	client := b.s.Client()
+	ek, err := NewUniqueEphemeralKey(b.s, b.key+"/waiters")
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 	b.myKey = ek
 	b.myKey = ek
 
 
-	resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
+	resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -59,12 +61,12 @@ func (b *DoubleBarrier) Enter() error {
 
 
 	if len(resp.Kvs) == b.count {
 	if len(resp.Kvs) == b.count {
 		// unblock waiters
 		// unblock waiters
-		_, err = b.client.Put(b.ctx, b.key+"/ready", "")
+		_, err = client.Put(b.ctx, b.key+"/ready", "")
 		return err
 		return err
 	}
 	}
 
 
 	_, err = WaitEvents(
 	_, err = WaitEvents(
-		b.client,
+		client,
 		b.key+"/ready",
 		b.key+"/ready",
 		ek.Revision(),
 		ek.Revision(),
 		[]mvccpb.Event_EventType{mvccpb.PUT})
 		[]mvccpb.Event_EventType{mvccpb.PUT})
@@ -73,7 +75,8 @@ func (b *DoubleBarrier) Enter() error {
 
 
 // Leave waits for "count" processes to leave the barrier then returns
 // Leave waits for "count" processes to leave the barrier then returns
 func (b *DoubleBarrier) Leave() error {
 func (b *DoubleBarrier) Leave() error {
-	resp, err := b.client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
+	client := b.s.Client()
+	resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -94,7 +97,7 @@ func (b *DoubleBarrier) Leave() error {
 
 
 	if len(resp.Kvs) == 1 {
 	if len(resp.Kvs) == 1 {
 		// this is the only node in the barrier; finish up
 		// this is the only node in the barrier; finish up
-		if _, err = b.client.Delete(b.ctx, b.key+"/ready"); err != nil {
+		if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil {
 			return err
 			return err
 		}
 		}
 		return b.myKey.Delete()
 		return b.myKey.Delete()
@@ -106,7 +109,7 @@ func (b *DoubleBarrier) Leave() error {
 	// lowest process in node => wait on highest process
 	// lowest process in node => wait on highest process
 	if isLowest {
 	if isLowest {
 		_, err = WaitEvents(
 		_, err = WaitEvents(
-			b.client,
+			client,
 			string(highest.Key),
 			string(highest.Key),
 			highest.ModRevision,
 			highest.ModRevision,
 			[]mvccpb.Event_EventType{mvccpb.DELETE})
 			[]mvccpb.Event_EventType{mvccpb.DELETE})
@@ -123,7 +126,7 @@ func (b *DoubleBarrier) Leave() error {
 
 
 	key := string(lowest.Key)
 	key := string(lowest.Key)
 	_, err = WaitEvents(
 	_, err = WaitEvents(
-		b.client,
+		client,
 		key,
 		key,
 		lowest.ModRevision,
 		lowest.ModRevision,
 		[]mvccpb.Event_EventType{mvccpb.DELETE})
 		[]mvccpb.Event_EventType{mvccpb.DELETE})

+ 6 - 10
contrib/recipes/key.go

@@ -160,12 +160,8 @@ func (rk *RemoteKV) Put(val string) error {
 type EphemeralKV struct{ RemoteKV }
 type EphemeralKV struct{ RemoteKV }
 
 
 // NewEphemeralKV creates a new key/value pair associated with a session lease
 // NewEphemeralKV creates a new key/value pair associated with a session lease
-func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) {
-	s, err := concurrency.NewSession(client)
-	if err != nil {
-		return nil, err
-	}
-	k, err := NewKV(client, key, val, s.Lease())
+func NewEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) {
+	k, err := NewKV(s.Client(), key, val, s.Lease())
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
@@ -173,15 +169,15 @@ func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) {
 }
 }
 
 
 // NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease
 // NewUniqueEphemeralKey creates a new unique valueless key associated with a session lease
-func NewUniqueEphemeralKey(client *v3.Client, prefix string) (*EphemeralKV, error) {
-	return NewUniqueEphemeralKV(client, prefix, "")
+func NewUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) {
+	return NewUniqueEphemeralKV(s, prefix, "")
 }
 }
 
 
 // NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
 // NewUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
-func NewUniqueEphemeralKV(client *v3.Client, prefix, val string) (ek *EphemeralKV, err error) {
+func NewUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) {
 	for {
 	for {
 		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
 		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
-		ek, err = NewEphemeralKV(client, newKey, val)
+		ek, err = NewEphemeralKV(s, newKey, val)
 		if err == nil || err != ErrKeyExists {
 		if err == nil || err != ErrKeyExists {
 			break
 			break
 		}
 		}

+ 17 - 10
contrib/recipes/rwmutex.go

@@ -16,24 +16,27 @@ package recipe
 
 
 import (
 import (
 	v3 "github.com/coreos/etcd/clientv3"
 	v3 "github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/clientv3/concurrency"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"github.com/coreos/etcd/mvcc/mvccpb"
 	"golang.org/x/net/context"
 	"golang.org/x/net/context"
 )
 )
 
 
 type RWMutex struct {
 type RWMutex struct {
-	client *v3.Client
-	ctx    context.Context
+	s   *concurrency.Session
+	ctx context.Context
 
 
 	key   string
 	key   string
 	myKey *EphemeralKV
 	myKey *EphemeralKV
 }
 }
 
 
-func NewRWMutex(client *v3.Client, key string) *RWMutex {
-	return &RWMutex{client, context.TODO(), key, nil}
+func NewRWMutex(s *concurrency.Session, key string) *RWMutex {
+	return &RWMutex{s, context.TODO(), key, nil}
 }
 }
 
 
 func (rwm *RWMutex) RLock() error {
 func (rwm *RWMutex) RLock() error {
-	rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/read")
+	client := rwm.s.Client()
+
+	rk, err := NewUniqueEphemeralKey(rwm.s, rwm.key+"/read")
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -41,7 +44,7 @@ func (rwm *RWMutex) RLock() error {
 
 
 	// if there are nodes with "write-" and a lower
 	// if there are nodes with "write-" and a lower
 	// revision number than us we must wait
 	// revision number than us we must wait
-	resp, err := rwm.client.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...)
+	resp, err := client.Get(rwm.ctx, rwm.key+"/write", v3.WithFirstRev()...)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -53,7 +56,9 @@ func (rwm *RWMutex) RLock() error {
 }
 }
 
 
 func (rwm *RWMutex) Lock() error {
 func (rwm *RWMutex) Lock() error {
-	rk, err := NewUniqueEphemeralKey(rwm.client, rwm.key+"/write")
+	client := rwm.s.Client()
+
+	rk, err := NewUniqueEphemeralKey(rwm.s, rwm.key+"/write")
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
@@ -62,7 +67,7 @@ func (rwm *RWMutex) Lock() error {
 	for {
 	for {
 		// find any key of lower rev number blocks the write lock
 		// find any key of lower rev number blocks the write lock
 		opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1))
 		opts := append(v3.WithLastRev(), v3.WithRev(rk.Revision()-1))
-		resp, err := rwm.client.Get(rwm.ctx, rwm.key, opts...)
+		resp, err := client.Get(rwm.ctx, rwm.key, opts...)
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
@@ -80,15 +85,17 @@ func (rwm *RWMutex) Lock() error {
 }
 }
 
 
 func (rwm *RWMutex) waitOnLowest() error {
 func (rwm *RWMutex) waitOnLowest() error {
+	client := rwm.s.Client()
+
 	// must block; get key before ek for waiting
 	// must block; get key before ek for waiting
 	opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1))
 	opts := append(v3.WithLastRev(), v3.WithRev(rwm.myKey.Revision()-1))
-	lastKey, err := rwm.client.Get(rwm.ctx, rwm.key, opts...)
+	lastKey, err := client.Get(rwm.ctx, rwm.key, opts...)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 	// wait for release on prior key
 	// wait for release on prior key
 	_, err = WaitEvents(
 	_, err = WaitEvents(
-		rwm.client,
+		client,
 		string(lastKey.Kvs[0].Key),
 		string(lastKey.Kvs[0].Key),
 		rwm.myKey.Revision(),
 		rwm.myKey.Revision(),
 		[]mvccpb.Event_EventType{mvccpb.DELETE})
 		[]mvccpb.Event_EventType{mvccpb.DELETE})

+ 11 - 3
etcdctl/ctlv3/command/elect_command.go

@@ -64,7 +64,11 @@ func electCommandFunc(cmd *cobra.Command, args []string) {
 }
 }
 
 
 func observe(c *clientv3.Client, election string) error {
 func observe(c *clientv3.Client, election string) error {
-	e := concurrency.NewElection(c, election)
+	s, err := concurrency.NewSession(c)
+	if err != nil {
+		return err
+	}
+	e := concurrency.NewElection(s, election)
 	ctx, cancel := context.WithCancel(context.TODO())
 	ctx, cancel := context.WithCancel(context.TODO())
 
 
 	donec := make(chan struct{})
 	donec := make(chan struct{})
@@ -94,7 +98,11 @@ func observe(c *clientv3.Client, election string) error {
 }
 }
 
 
 func campaign(c *clientv3.Client, election string, prop string) error {
 func campaign(c *clientv3.Client, election string, prop string) error {
-	e := concurrency.NewElection(c, election)
+	s, err := concurrency.NewSession(c)
+	if err != nil {
+		return err
+	}
+	e := concurrency.NewElection(s, election)
 	ctx, cancel := context.WithCancel(context.TODO())
 	ctx, cancel := context.WithCancel(context.TODO())
 
 
 	donec := make(chan struct{})
 	donec := make(chan struct{})
@@ -111,7 +119,7 @@ func campaign(c *clientv3.Client, election string, prop string) error {
 		return serr
 		return serr
 	}
 	}
 
 
-	if err := e.Campaign(ctx, prop); err != nil {
+	if err = e.Campaign(ctx, prop); err != nil {
 		return err
 		return err
 	}
 	}
 
 

+ 6 - 1
etcdctl/ctlv3/command/lock_command.go

@@ -46,7 +46,12 @@ func lockCommandFunc(cmd *cobra.Command, args []string) {
 }
 }
 
 
 func lockUntilSignal(c *clientv3.Client, lockname string) error {
 func lockUntilSignal(c *clientv3.Client, lockname string) error {
-	m := concurrency.NewMutex(c, lockname)
+	s, err := concurrency.NewSession(c)
+	if err != nil {
+		return err
+	}
+
+	m := concurrency.NewMutex(s, lockname)
 	ctx, cancel := context.WithCancel(context.TODO())
 	ctx, cancel := context.WithCancel(context.TODO())
 
 
 	// unlock in case of ordinary shutdown
 	// unlock in case of ordinary shutdown

+ 30 - 21
integration/v3_double_barrier_test.go

@@ -25,15 +25,25 @@ import (
 func TestDoubleBarrier(t *testing.T) {
 func TestDoubleBarrier(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
-	defer dropSessionLease(clus)
 
 
 	waiters := 10
 	waiters := 10
+	session, err := concurrency.NewSession(clus.RandClient())
+	if err != nil {
+		t.Error(err)
+	}
+	defer session.Orphan()
 
 
-	b := recipe.NewDoubleBarrier(clus.RandClient(), "test-barrier", waiters)
+	b := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
 	donec := make(chan struct{})
 	donec := make(chan struct{})
 	for i := 0; i < waiters-1; i++ {
 	for i := 0; i < waiters-1; i++ {
 		go func() {
 		go func() {
-			bb := recipe.NewDoubleBarrier(clus.RandClient(), "test-barrier", waiters)
+			session, err := concurrency.NewSession(clus.RandClient())
+			if err != nil {
+				t.Error(err)
+			}
+			defer session.Orphan()
+
+			bb := recipe.NewDoubleBarrier(session, "test-barrier", waiters)
 			if err := bb.Enter(); err != nil {
 			if err := bb.Enter(); err != nil {
 				t.Fatalf("could not enter on barrier (%v)", err)
 				t.Fatalf("could not enter on barrier (%v)", err)
 			}
 			}
@@ -86,15 +96,25 @@ func TestDoubleBarrier(t *testing.T) {
 func TestDoubleBarrierFailover(t *testing.T) {
 func TestDoubleBarrierFailover(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
-	defer dropSessionLease(clus)
 
 
 	waiters := 10
 	waiters := 10
 	donec := make(chan struct{})
 	donec := make(chan struct{})
 
 
+	s0, err := concurrency.NewSession(clus.clients[0])
+	if err != nil {
+		t.Error(err)
+	}
+	defer s0.Orphan()
+	s1, err := concurrency.NewSession(clus.clients[0])
+	if err != nil {
+		t.Error(err)
+	}
+	defer s1.Orphan()
+
 	// sacrificial barrier holder; lease will be revoked
 	// sacrificial barrier holder; lease will be revoked
 	go func() {
 	go func() {
-		b := recipe.NewDoubleBarrier(clus.clients[0], "test-barrier", waiters)
-		if err := b.Enter(); err != nil {
+		b := recipe.NewDoubleBarrier(s0, "test-barrier", waiters)
+		if err = b.Enter(); err != nil {
 			t.Fatalf("could not enter on barrier (%v)", err)
 			t.Fatalf("could not enter on barrier (%v)", err)
 		}
 		}
 		donec <- struct{}{}
 		donec <- struct{}{}
@@ -102,8 +122,8 @@ func TestDoubleBarrierFailover(t *testing.T) {
 
 
 	for i := 0; i < waiters-1; i++ {
 	for i := 0; i < waiters-1; i++ {
 		go func() {
 		go func() {
-			b := recipe.NewDoubleBarrier(clus.clients[1], "test-barrier", waiters)
-			if err := b.Enter(); err != nil {
+			b := recipe.NewDoubleBarrier(s1, "test-barrier", waiters)
+			if err = b.Enter(); err != nil {
 				t.Fatalf("could not enter on barrier (%v)", err)
 				t.Fatalf("could not enter on barrier (%v)", err)
 			}
 			}
 			donec <- struct{}{}
 			donec <- struct{}{}
@@ -120,12 +140,8 @@ func TestDoubleBarrierFailover(t *testing.T) {
 			t.Fatalf("timed out waiting for enter, %d", i)
 			t.Fatalf("timed out waiting for enter, %d", i)
 		}
 		}
 	}
 	}
-	// kill lease, expect Leave unblock
-	s, err := concurrency.NewSession(clus.clients[0])
-	if err != nil {
-		t.Fatal(err)
-	}
-	if err = s.Close(); err != nil {
+
+	if err = s0.Close(); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	// join on rest of waiters
 	// join on rest of waiters
@@ -137,10 +153,3 @@ func TestDoubleBarrierFailover(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
-
-func dropSessionLease(clus *ClusterV3) {
-	for _, client := range clus.clients {
-		s, _ := concurrency.NewSession(client)
-		s.Orphan()
-	}
-}

+ 37 - 14
integration/v3_election_test.go

@@ -28,7 +28,6 @@ import (
 func TestElectionWait(t *testing.T) {
 func TestElectionWait(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
-	defer dropSessionLease(clus)
 
 
 	leaders := 3
 	leaders := 3
 	followers := 3
 	followers := 3
@@ -44,7 +43,12 @@ func TestElectionWait(t *testing.T) {
 		nextc = append(nextc, make(chan struct{}))
 		nextc = append(nextc, make(chan struct{}))
 		go func(ch chan struct{}) {
 		go func(ch chan struct{}) {
 			for j := 0; j < leaders; j++ {
 			for j := 0; j < leaders; j++ {
-				b := concurrency.NewElection(newClient(), "test-election")
+				session, err := concurrency.NewSession(newClient())
+				if err != nil {
+					t.Error(err)
+				}
+				b := concurrency.NewElection(session, "test-election")
+
 				cctx, cancel := context.WithCancel(context.TODO())
 				cctx, cancel := context.WithCancel(context.TODO())
 				defer cancel()
 				defer cancel()
 				s, ok := <-b.Observe(cctx)
 				s, ok := <-b.Observe(cctx)
@@ -54,6 +58,7 @@ func TestElectionWait(t *testing.T) {
 				electedc <- string(s.Kvs[0].Value)
 				electedc <- string(s.Kvs[0].Value)
 				// wait for next election round
 				// wait for next election round
 				<-ch
 				<-ch
+				session.Orphan()
 			}
 			}
 			donec <- struct{}{}
 			donec <- struct{}{}
 		}(nextc[i])
 		}(nextc[i])
@@ -62,7 +67,13 @@ func TestElectionWait(t *testing.T) {
 	// elect some leaders
 	// elect some leaders
 	for i := 0; i < leaders; i++ {
 	for i := 0; i < leaders; i++ {
 		go func() {
 		go func() {
-			e := concurrency.NewElection(newClient(), "test-election")
+			session, err := concurrency.NewSession(newClient())
+			if err != nil {
+				t.Error(err)
+			}
+			defer session.Orphan()
+
+			e := concurrency.NewElection(session, "test-election")
 			ev := fmt.Sprintf("electval-%v", time.Now().UnixNano())
 			ev := fmt.Sprintf("electval-%v", time.Now().UnixNano())
 			if err := e.Campaign(context.TODO(), ev); err != nil {
 			if err := e.Campaign(context.TODO(), ev); err != nil {
 				t.Fatalf("failed volunteer (%v)", err)
 				t.Fatalf("failed volunteer (%v)", err)
@@ -97,13 +108,23 @@ func TestElectionWait(t *testing.T) {
 func TestElectionFailover(t *testing.T) {
 func TestElectionFailover(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
-	defer dropSessionLease(clus)
 
 
 	cctx, cancel := context.WithCancel(context.TODO())
 	cctx, cancel := context.WithCancel(context.TODO())
 	defer cancel()
 	defer cancel()
 
 
+	ss := make([]*concurrency.Session, 3, 3)
+
+	for i := 0; i < 3; i++ {
+		var err error
+		ss[i], err = concurrency.NewSession(clus.clients[i])
+		if err != nil {
+			t.Error(err)
+		}
+		defer ss[i].Orphan()
+	}
+
 	// first leader (elected)
 	// first leader (elected)
-	e := concurrency.NewElection(clus.clients[0], "test-election")
+	e := concurrency.NewElection(ss[0], "test-election")
 	if err := e.Campaign(context.TODO(), "foo"); err != nil {
 	if err := e.Campaign(context.TODO(), "foo"); err != nil {
 		t.Fatalf("failed volunteer (%v)", err)
 		t.Fatalf("failed volunteer (%v)", err)
 	}
 	}
@@ -121,7 +142,7 @@ func TestElectionFailover(t *testing.T) {
 	// next leader
 	// next leader
 	electedc := make(chan struct{})
 	electedc := make(chan struct{})
 	go func() {
 	go func() {
-		ee := concurrency.NewElection(clus.clients[1], "test-election")
+		ee := concurrency.NewElection(ss[1], "test-election")
 		if eer := ee.Campaign(context.TODO(), "bar"); eer != nil {
 		if eer := ee.Campaign(context.TODO(), "bar"); eer != nil {
 			t.Fatal(eer)
 			t.Fatal(eer)
 		}
 		}
@@ -129,16 +150,12 @@ func TestElectionFailover(t *testing.T) {
 	}()
 	}()
 
 
 	// invoke leader failover
 	// invoke leader failover
-	session, serr := concurrency.NewSession(clus.clients[0])
-	if serr != nil {
-		t.Fatal(serr)
-	}
-	if err := session.Close(); err != nil {
+	if err := ss[0].Close(); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 
 
 	// check new leader
 	// check new leader
-	e = concurrency.NewElection(clus.clients[2], "test-election")
+	e = concurrency.NewElection(ss[2], "test-election")
 	resp, ok = <-e.Observe(cctx)
 	resp, ok = <-e.Observe(cctx)
 	if !ok {
 	if !ok {
 		t.Fatalf("could not wait for second election; channel closed")
 		t.Fatalf("could not wait for second election; channel closed")
@@ -159,11 +176,17 @@ func TestElectionSessionRecampaign(t *testing.T) {
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
 	cli := clus.RandClient()
 	cli := clus.RandClient()
 
 
-	e := concurrency.NewElection(cli, "test-elect")
+	session, err := concurrency.NewSession(cli)
+	if err != nil {
+		t.Error(err)
+	}
+	defer session.Orphan()
+
+	e := concurrency.NewElection(session, "test-elect")
 	if err := e.Campaign(context.TODO(), "abc"); err != nil {
 	if err := e.Campaign(context.TODO(), "abc"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	e2 := concurrency.NewElection(cli, "test-elect")
+	e2 := concurrency.NewElection(session, "test-elect")
 	if err := e2.Campaign(context.TODO(), "def"); err != nil {
 	if err := e2.Campaign(context.TODO(), "def"); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}

+ 18 - 5
integration/v3_lock_test.go

@@ -49,7 +49,11 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
 	lockedC := make(chan *concurrency.Mutex)
 	lockedC := make(chan *concurrency.Mutex)
 	for i := 0; i < waiters; i++ {
 	for i := 0; i < waiters; i++ {
 		go func() {
 		go func() {
-			m := concurrency.NewMutex(chooseClient(), "test-mutex")
+			session, err := concurrency.NewSession(chooseClient())
+			if err != nil {
+				t.Error(err)
+			}
+			m := concurrency.NewMutex(session, "test-mutex")
 			if err := m.Lock(context.TODO()); err != nil {
 			if err := m.Lock(context.TODO()); err != nil {
 				t.Fatalf("could not wait on lock (%v)", err)
 				t.Fatalf("could not wait on lock (%v)", err)
 			}
 			}
@@ -81,12 +85,17 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
 func TestMutexSessionRelock(t *testing.T) {
 func TestMutexSessionRelock(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
 	defer clus.Terminate(t)
-	cli := clus.RandClient()
-	m := concurrency.NewMutex(cli, "test-mutex")
+	session, err := concurrency.NewSession(clus.RandClient())
+	if err != nil {
+		t.Error(err)
+	}
+
+	m := concurrency.NewMutex(session, "test-mutex")
 	if err := m.Lock(context.TODO()); err != nil {
 	if err := m.Lock(context.TODO()); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	m2 := concurrency.NewMutex(cli, "test-mutex")
+
+	m2 := concurrency.NewMutex(session, "test-mutex")
 	if err := m2.Lock(context.TODO()); err != nil {
 	if err := m2.Lock(context.TODO()); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -119,7 +128,11 @@ func testRWMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client
 	wlockedC := make(chan *recipe.RWMutex, 1)
 	wlockedC := make(chan *recipe.RWMutex, 1)
 	for i := 0; i < waiters; i++ {
 	for i := 0; i < waiters; i++ {
 		go func() {
 		go func() {
-			rwm := recipe.NewRWMutex(chooseClient(), "test-rwmutex")
+			session, err := concurrency.NewSession(chooseClient())
+			if err != nil {
+				t.Error(err)
+			}
+			rwm := recipe.NewRWMutex(session, "test-rwmutex")
 			if rand.Intn(1) == 0 {
 			if rand.Intn(1) == 0 {
 				if err := rwm.RLock(); err != nil {
 				if err := rwm.RLock(); err != nil {
 					t.Fatalf("could not rlock (%v)", err)
 					t.Fatalf("could not rlock (%v)", err)

+ 5 - 1
tools/benchmark/cmd/stm.go

@@ -145,7 +145,11 @@ func doSTM(ctx context.Context, client *v3.Client, requests <-chan stmApply) {
 
 
 	var m *v3sync.Mutex
 	var m *v3sync.Mutex
 	if stmMutex {
 	if stmMutex {
-		m = v3sync.NewMutex(client, "stmlock")
+		s, err := v3sync.NewSession(client)
+		if err != nil {
+			panic(err)
+		}
+		m = v3sync.NewMutex(s, "stmlock")
 	}
 	}
 
 
 	for applyf := range requests {
 	for applyf := range requests {

+ 23 - 3
tools/functional-tester/etcd-runner/main.go

@@ -66,7 +66,17 @@ func runElection(eps []string, rounds int) {
 		validateWaiters := 0
 		validateWaiters := 0
 
 
 		rcs[i].c = randClient(eps)
 		rcs[i].c = randClient(eps)
-		e := concurrency.NewElection(rcs[i].c, "electors")
+		var (
+			s   *concurrency.Session
+			err error
+		)
+		for {
+			s, err = concurrency.NewSession(rcs[i].c)
+			if err == nil {
+				break
+			}
+		}
+		e := concurrency.NewElection(s, "electors")
 
 
 		rcs[i].acquire = func() error {
 		rcs[i].acquire = func() error {
 			<-releasec
 			<-releasec
@@ -79,7 +89,7 @@ func runElection(eps []string, rounds int) {
 					}
 					}
 				}
 				}
 			}()
 			}()
-			err := e.Campaign(ctx, v)
+			err = e.Campaign(ctx, v)
 			if err == nil {
 			if err == nil {
 				observedLeader = v
 				observedLeader = v
 			}
 			}
@@ -173,7 +183,17 @@ func runRacer(eps []string, round int) {
 	cnt := 0
 	cnt := 0
 	for i := range rcs {
 	for i := range rcs {
 		rcs[i].c = randClient(eps)
 		rcs[i].c = randClient(eps)
-		m := concurrency.NewMutex(rcs[i].c, "racers")
+		var (
+			s   *concurrency.Session
+			err error
+		)
+		for {
+			s, err = concurrency.NewSession(rcs[i].c)
+			if err == nil {
+				break
+			}
+		}
+		m := concurrency.NewMutex(s, "racers")
 		rcs[i].acquire = func() error { return m.Lock(ctx) }
 		rcs[i].acquire = func() error { return m.Lock(ctx) }
 		rcs[i].validate = func() error {
 		rcs[i].validate = func() error {
 			if cnt++; cnt != 1 {
 			if cnt++; cnt != 1 {