Browse Source

Merge pull request #7248 from ravigadde/session-w-lease

clientv3: start a session with existing lease
Xiang Li 9 years ago
parent
commit
7d6280fa82
2 changed files with 66 additions and 6 deletions
  1. 19 6
      clientv3/concurrency/session.go
  2. 47 0
      integration/v3_election_test.go

+ 19 - 6
clientv3/concurrency/session.go

@@ -41,11 +41,14 @@ func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
 		opt(ops)
 		opt(ops)
 	}
 	}
 
 
-	resp, err := client.Grant(ops.ctx, int64(ops.ttl))
-	if err != nil {
-		return nil, err
+	id := ops.leaseID
+	if id == v3.NoLease {
+		resp, err := client.Grant(ops.ctx, int64(ops.ttl))
+		if err != nil {
+			return nil, err
+		}
+		id = v3.LeaseID(resp.ID)
 	}
 	}
-	id := v3.LeaseID(resp.ID)
 
 
 	ctx, cancel := context.WithCancel(ops.ctx)
 	ctx, cancel := context.WithCancel(ops.ctx)
 	keepAlive, err := client.KeepAlive(ctx, id)
 	keepAlive, err := client.KeepAlive(ctx, id)
@@ -98,8 +101,9 @@ func (s *Session) Close() error {
 }
 }
 
 
 type sessionOptions struct {
 type sessionOptions struct {
-	ttl int
-	ctx context.Context
+	ttl     int
+	leaseID v3.LeaseID
+	ctx     context.Context
 }
 }
 
 
 // SessionOption configures Session.
 // SessionOption configures Session.
@@ -115,6 +119,15 @@ func WithTTL(ttl int) SessionOption {
 	}
 	}
 }
 }
 
 
+// WithLease specifies the existing leaseID to be used for the session.
+// This is useful in process restart scenario, for example, to reclaim
+// leadership from an election prior to restart.
+func WithLease(leaseID v3.LeaseID) SessionOption {
+	return func(so *sessionOptions) {
+		so.leaseID = leaseID
+	}
+}
+
 // WithContext assigns a context to the session instead of defaulting to
 // WithContext assigns a context to the session instead of defaulting to
 // using the client context. This is useful for canceling NewSession and
 // using the client context. This is useful for canceling NewSession and
 // Close operations immediately without having to close the client. If the
 // Close operations immediately without having to close the client. If the

+ 47 - 0
integration/v3_election_test.go

@@ -225,3 +225,50 @@ func TestElectionOnPrefixOfExistingKey(t *testing.T) {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 }
 }
+
+// TestElectionOnSessionRestart tests that a quick restart of leader (resulting
+// in a new session with the same lease id) does not result in loss of
+// leadership.
+func TestElectionOnSessionRestart(t *testing.T) {
+	clus := NewClusterV3(t, &ClusterConfig{Size: 1})
+	defer clus.Terminate(t)
+	cli := clus.RandClient()
+
+	session, err := concurrency.NewSession(cli)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	e := concurrency.NewElection(session, "test-elect")
+	if cerr := e.Campaign(context.TODO(), "abc"); cerr != nil {
+		t.Fatal(cerr)
+	}
+
+	// ensure leader is not lost to waiter on fail-over
+	waitSession, werr := concurrency.NewSession(cli)
+	if werr != nil {
+		t.Fatal(werr)
+	}
+	defer waitSession.Orphan()
+	waitCtx, waitCancel := context.WithTimeout(context.TODO(), 5*time.Second)
+	defer waitCancel()
+	go concurrency.NewElection(waitSession, "test-elect").Campaign(waitCtx, "123")
+
+	// simulate restart by reusing the lease from the old session
+	newSession, nerr := concurrency.NewSession(cli, concurrency.WithLease(session.Lease()))
+	if nerr != nil {
+		t.Fatal(nerr)
+	}
+	defer newSession.Orphan()
+
+	newElection := concurrency.NewElection(newSession, "test-elect")
+	if ncerr := newElection.Campaign(context.TODO(), "def"); ncerr != nil {
+		t.Fatal(ncerr)
+	}
+
+	ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
+	defer cancel()
+	if resp := <-newElection.Observe(ctx); len(resp.Kvs) == 0 || string(resp.Kvs[0].Value) != "def" {
+		t.Errorf("expected value=%q, got response %v", "def", resp)
+	}
+}