Bladeren bron

clientv3/concurrency: Session

A client may bind itself to a session lease to signal its
continued in participation with the cluster.
Anthony Romano 9 jaren geleden
bovenliggende
commit
20b4336cdb

+ 106 - 0
clientv3/concurrency/session.go

@@ -0,0 +1,106 @@
+// Copyright 2016 CoreOS, Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package concurrency
+
+import (
+	"sync"
+
+	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
+	v3 "github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/lease"
+)
+
+// only keep one ephemeral lease per client
+var clientSessions clientSessionMgr = clientSessionMgr{sessions: make(map[*v3.Client]*Session)}
+
+const sessionTTL = 60
+
+type clientSessionMgr struct {
+	sessions map[*v3.Client]*Session
+	mu       sync.Mutex
+}
+
+// Session represents a lease kept alive for the lifetime of a client.
+// Fault-tolerant applications may use sessions to reason about liveness.
+type Session struct {
+	client *v3.Client
+	id     lease.LeaseID
+
+	cancel context.CancelFunc
+	donec  <-chan struct{}
+}
+
+// NewSession gets the leased session for a client.
+func NewSession(client *v3.Client) (*Session, error) {
+	clientSessions.mu.Lock()
+	defer clientSessions.mu.Unlock()
+	if s, ok := clientSessions.sessions[client]; ok {
+		return s, nil
+	}
+
+	lc := v3.NewLease(client)
+	resp, err := lc.Create(context.TODO(), sessionTTL)
+	if err != nil {
+		return nil, err
+	}
+	id := lease.LeaseID(resp.ID)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	keepAlive, err := lc.KeepAlive(ctx, id)
+	if err != nil || keepAlive == nil {
+		return nil, err
+	}
+
+	donec := make(chan struct{})
+	s := &Session{client: client, id: id, cancel: cancel, donec: donec}
+	clientSessions.sessions[client] = s
+
+	// keep the lease alive until client error or cancelled context
+	go func() {
+		defer func() {
+			clientSessions.mu.Lock()
+			delete(clientSessions.sessions, client)
+			clientSessions.mu.Unlock()
+			lc.Close()
+			close(donec)
+		}()
+		for range keepAlive {
+			// eat messages until keep alive channel closes
+		}
+	}()
+
+	return s, nil
+}
+
+// Lease is the lease ID for keys bound to the session.
+func (s *Session) Lease() lease.LeaseID { return s.id }
+
+// Done returns a channel that closes when the lease is orphaned, expires, or
+// is otherwise no longer being refreshed.
+func (s *Session) Done() <-chan struct{} { return s.donec }
+
+// Orphan ends the refresh for the session lease. This is useful
+// in case the state of the client connection is indeterminate (revoke
+// would fail) or when transferring lease ownership.
+func (s *Session) Orphan() {
+	s.cancel()
+	<-s.donec
+}
+
+// Close orphans the session and revokes the session lease.
+func (s *Session) Close() error {
+	s.Orphan()
+	_, err := v3.NewLease(s.client).Revoke(context.TODO(), s.id)
+	return err
+}

+ 3 - 2
contrib/recipes/key.go

@@ -21,6 +21,7 @@ import (
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
 	v3 "github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/clientv3/concurrency"
 	"github.com/coreos/etcd/lease"
 )
 
@@ -161,11 +162,11 @@ type EphemeralKV struct{ RemoteKV }
 
 // NewEphemeralKV creates a new key/value pair associated with a session lease
 func NewEphemeralKV(client *v3.Client, key, val string) (*EphemeralKV, error) {
-	leaseID, err := SessionLease(client)
+	s, err := concurrency.NewSession(client)
 	if err != nil {
 		return nil, err
 	}
-	k, err := NewKV(v3.NewKV(client), key, val, leaseID)
+	k, err := NewKV(v3.NewKV(client), key, val, s.Lease())
 	if err != nil {
 		return nil, err
 	}

+ 0 - 113
contrib/recipes/lease.go

@@ -1,113 +0,0 @@
-// Copyright 2016 CoreOS, Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-package recipe
-
-import (
-	"sync"
-
-	"github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
-	"github.com/coreos/etcd/clientv3"
-	"github.com/coreos/etcd/lease"
-)
-
-// only keep one ephemeral lease per client
-var clientLeases clientLeaseMgr = clientLeaseMgr{leases: make(map[*clientv3.Client]*leaseKeepAlive)}
-
-type clientLeaseMgr struct {
-	leases map[*clientv3.Client]*leaseKeepAlive
-	mu     sync.Mutex
-}
-
-type leaseKeepAlive struct {
-	id     lease.LeaseID
-	cancel context.CancelFunc
-	donec  <-chan struct{}
-}
-
-func SessionLease(client *clientv3.Client) (lease.LeaseID, error) {
-	return clientLeases.sessionLease(client, 120)
-}
-
-func SessionLeaseTTL(client *clientv3.Client, ttl int64) (lease.LeaseID, error) {
-	return clientLeases.sessionLease(client, ttl)
-}
-
-// StopSessionLease ends the refresh for the session lease. This is useful
-// in case the state of the client connection is indeterminate (revoke
-// would fail) or if transferring lease ownership.
-func StopSessionLease(client *clientv3.Client) {
-	clientLeases.mu.Lock()
-	lka := clientLeases.leases[client]
-	clientLeases.mu.Unlock()
-	if lka != nil {
-		lka.cancel()
-		<-lka.donec
-	}
-}
-
-// RevokeSessionLease revokes the session lease.
-func RevokeSessionLease(client *clientv3.Client) (err error) {
-	clientLeases.mu.Lock()
-	lka := clientLeases.leases[client]
-	clientLeases.mu.Unlock()
-	StopSessionLease(client)
-	if lka != nil {
-		_, err = clientv3.NewLease(client).Revoke(context.TODO(), lka.id)
-	}
-	return err
-}
-
-func (clm *clientLeaseMgr) sessionLease(client *clientv3.Client, ttl int64) (lease.LeaseID, error) {
-	clm.mu.Lock()
-	defer clm.mu.Unlock()
-	if lka, ok := clm.leases[client]; ok {
-		return lka.id, nil
-	}
-
-	lc := clientv3.NewLease(client)
-	resp, err := lc.Create(context.TODO(), ttl)
-	if err != nil {
-		return lease.NoLease, err
-	}
-	id := lease.LeaseID(resp.ID)
-
-	ctx, cancel := context.WithCancel(context.Background())
-	keepAlive, err := lc.KeepAlive(ctx, id)
-	if err != nil || keepAlive == nil {
-		return lease.NoLease, err
-	}
-
-	donec := make(chan struct{})
-	lka := &leaseKeepAlive{
-		id:     id,
-		cancel: cancel,
-		donec:  donec}
-	clm.leases[client] = lka
-
-	// keep the lease alive until client error or cancelled context
-	go func() {
-		defer func() {
-			clm.mu.Lock()
-			delete(clm.leases, client)
-			clm.mu.Unlock()
-			lc.Close()
-			close(donec)
-		}()
-		for range keepAlive {
-			// eat messages until keep alive channel closes
-		}
-	}()
-
-	return id, nil
-}

+ 13 - 5
integration/v3_double_barrier_test.go

@@ -17,13 +17,14 @@ import (
 	"testing"
 	"time"
 
+	"github.com/coreos/etcd/clientv3/concurrency"
 	"github.com/coreos/etcd/contrib/recipes"
 )
 
 func TestDoubleBarrier(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
-	defer closeSessionLease(clus)
+	defer dropSessionLease(clus)
 
 	waiters := 10
 
@@ -84,7 +85,7 @@ func TestDoubleBarrier(t *testing.T) {
 func TestDoubleBarrierFailover(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
-	defer closeSessionLease(clus)
+	defer dropSessionLease(clus)
 
 	waiters := 10
 	donec := make(chan struct{})
@@ -119,7 +120,13 @@ func TestDoubleBarrierFailover(t *testing.T) {
 		}
 	}
 	// kill lease, expect Leave unblock
-	recipe.RevokeSessionLease(clus.clients[0])
+	s, err := concurrency.NewSession(clus.clients[0])
+	if err != nil {
+		t.Fatal(err)
+	}
+	if err = s.Close(); err != nil {
+		t.Fatal(err)
+	}
 	// join on rest of waiters
 	for i := 0; i < waiters-1; i++ {
 		select {
@@ -130,8 +137,9 @@ func TestDoubleBarrierFailover(t *testing.T) {
 	}
 }
 
-func closeSessionLease(clus *ClusterV3) {
+func dropSessionLease(clus *ClusterV3) {
 	for _, client := range clus.clients {
-		recipe.StopSessionLease(client)
+		s, _ := concurrency.NewSession(client)
+		s.Orphan()
 	}
 }

+ 8 - 3
integration/v3_election_test.go

@@ -18,6 +18,7 @@ import (
 	"testing"
 	"time"
 
+	"github.com/coreos/etcd/clientv3/concurrency"
 	"github.com/coreos/etcd/contrib/recipes"
 )
 
@@ -25,7 +26,7 @@ import (
 func TestElectionWait(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
-	defer closeSessionLease(clus)
+	defer dropSessionLease(clus)
 
 	leaders := 3
 	followers := 3
@@ -88,7 +89,7 @@ func TestElectionWait(t *testing.T) {
 func TestElectionFailover(t *testing.T) {
 	clus := NewClusterV3(t, &ClusterConfig{Size: 3})
 	defer clus.Terminate(t)
-	defer closeSessionLease(clus)
+	defer dropSessionLease(clus)
 
 	// first leader (elected)
 	e := recipe.NewElection(clus.clients[0], "test-election")
@@ -116,7 +117,11 @@ func TestElectionFailover(t *testing.T) {
 	}()
 
 	// invoke leader failover
-	err = recipe.RevokeSessionLease(clus.clients[0])
+	session, serr := concurrency.NewSession(clus.clients[0])
+	if serr != nil {
+		t.Fatal(serr)
+	}
+	err = session.Close()
 	if err != nil {
 		t.Fatal(err)
 	}