Sfoglia il codice sorgente

clientv3: include a context in Client

Useful for clean up tasks
Anthony Romano 9 anni fa
parent
commit
360aafec76

+ 24 - 3
clientv3/client.go

@@ -45,6 +45,9 @@ type Client struct {
 	creds  *credentials.TransportAuthenticator
 	mu     sync.RWMutex // protects connection selection and error list
 	errors []error      // errors passed to retryConnection
+
+	ctx    context.Context
+	cancel context.CancelFunc
 }
 
 // EndpointDialer is a policy for choosing which endpoint to dial next
@@ -83,11 +86,23 @@ func NewFromURL(url string) (*Client, error) {
 
 // Close shuts down the client's etcd connections.
 func (c *Client) Close() error {
+	c.mu.Lock()
+	defer c.mu.Unlock()
+	if c.cancel == nil {
+		return nil
+	}
+	c.cancel()
+	c.cancel = nil
 	c.Watcher.Close()
 	c.Lease.Close()
 	return c.conn.Close()
 }
 
+// Ctx is a context for "out of band" messages (e.g., for sending
+// "clean up" message when another context is canceled). It is
+// canceled on client Close().
+func (c *Client) Ctx() context.Context { return c.ctx }
+
 // Endpoints lists the registered endpoints for the client.
 func (c *Client) Endpoints() []string { return c.cfg.Endpoints }
 
@@ -145,10 +160,13 @@ func newClient(cfg *Config) (*Client, error) {
 	if err != nil {
 		return nil, err
 	}
+	ctx, cancel := context.WithCancel(context.TODO())
 	client := &Client{
-		conn:  conn,
-		cfg:   *cfg,
-		creds: creds,
+		conn:   conn,
+		cfg:    *cfg,
+		creds:  creds,
+		ctx:    ctx,
+		cancel: cancel,
 	}
 	client.Cluster = NewCluster(client)
 	client.KV = NewKV(client)
@@ -173,6 +191,9 @@ func (c *Client) retryConnection(oldConn *grpc.ClientConn, err error) (*grpc.Cli
 	if err != nil {
 		c.errors = append(c.errors, err)
 	}
+	if c.cancel == nil {
+		return nil, c.ctx.Err()
+	}
 	if oldConn != c.conn {
 		// conn has already been updated
 		return c.conn, nil

+ 5 - 6
clientv3/concurrency/election.go

@@ -29,7 +29,6 @@ var (
 
 type Election struct {
 	client *v3.Client
-	ctx    context.Context
 
 	keyPrefix string
 
@@ -39,8 +38,8 @@ type Election struct {
 }
 
 // NewElection returns a new election on a given key prefix.
-func NewElection(ctx context.Context, client *v3.Client, pfx string) *Election {
-	return &Election{client: client, ctx: ctx, keyPrefix: pfx}
+func NewElection(client *v3.Client, pfx string) *Election {
+	return &Election{client: client, keyPrefix: pfx}
 }
 
 // Campaign puts a value as eligible for the election. It blocks until
@@ -60,7 +59,7 @@ func (e *Election) Campaign(ctx context.Context, val string) error {
 		// clean up in case of context cancel
 		select {
 		case <-ctx.Done():
-			e.client.Delete(e.ctx, k)
+			e.client.Delete(e.client.Ctx(), k)
 		default:
 		}
 		return err
@@ -94,7 +93,7 @@ func (e *Election) Resign() (err error) {
 	if e.leaderSession == nil {
 		return nil
 	}
-	_, err = e.client.Delete(e.ctx, e.leaderKey)
+	_, err = e.client.Delete(e.client.Ctx(), e.leaderKey)
 	e.leaderKey = ""
 	e.leaderSession = nil
 	return err
@@ -102,7 +101,7 @@ func (e *Election) Resign() (err error) {
 
 // Leader returns the leader value for the current election.
 func (e *Election) Leader() (string, error) {
-	resp, err := e.client.Get(e.ctx, e.keyPrefix, v3.WithFirstCreate()...)
+	resp, err := e.client.Get(e.client.Ctx(), e.keyPrefix, v3.WithFirstCreate()...)
 	if err != nil {
 		return "", err
 	} else if len(resp.Kvs) == 0 {

+ 6 - 7
clientv3/concurrency/mutex.go

@@ -24,15 +24,14 @@ import (
 // Mutex implements the sync Locker interface with etcd
 type Mutex struct {
 	client *v3.Client
-	ctx    context.Context
 
 	pfx   string
 	myKey string
 	myRev int64
 }
 
-func NewMutex(ctx context.Context, client *v3.Client, pfx string) *Mutex {
-	return &Mutex{client, ctx, pfx, "", -1}
+func NewMutex(client *v3.Client, pfx string) *Mutex {
+	return &Mutex{client, pfx, "", -1}
 }
 
 // Lock locks the mutex with a cancellable context. If the context is cancelled
@@ -56,7 +55,7 @@ func (m *Mutex) Lock(ctx context.Context) error {
 }
 
 func (m *Mutex) Unlock() error {
-	if _, err := m.client.Delete(m.ctx, m.myKey); err != nil {
+	if _, err := m.client.Delete(m.client.Ctx(), m.myKey); err != nil {
 		return err
 	}
 	m.myKey = "\x00"
@@ -73,7 +72,7 @@ func (m *Mutex) Key() string { return m.myKey }
 type lockerMutex struct{ *Mutex }
 
 func (lm *lockerMutex) Lock() {
-	if err := lm.Mutex.Lock(lm.ctx); err != nil {
+	if err := lm.Mutex.Lock(lm.client.Ctx()); err != nil {
 		panic(err)
 	}
 }
@@ -84,6 +83,6 @@ func (lm *lockerMutex) Unlock() {
 }
 
 // NewLocker creates a sync.Locker backed by an etcd mutex.
-func NewLocker(ctx context.Context, client *v3.Client, pfx string) sync.Locker {
-	return &lockerMutex{NewMutex(ctx, client, pfx)}
+func NewLocker(client *v3.Client, pfx string) sync.Locker {
+	return &lockerMutex{NewMutex(client, pfx)}
 }

+ 3 - 3
clientv3/concurrency/session.go

@@ -49,13 +49,13 @@ func NewSession(client *v3.Client) (*Session, error) {
 		return s, nil
 	}
 
-	resp, err := client.Create(context.TODO(), sessionTTL)
+	resp, err := client.Create(client.Ctx(), sessionTTL)
 	if err != nil {
 		return nil, err
 	}
 	id := lease.LeaseID(resp.ID)
 
-	ctx, cancel := context.WithCancel(context.Background())
+	ctx, cancel := context.WithCancel(client.Ctx())
 	keepAlive, err := client.KeepAlive(ctx, id)
 	if err != nil || keepAlive == nil {
 		return nil, err
@@ -99,6 +99,6 @@ func (s *Session) Orphan() {
 // Close orphans the session and revokes the session lease.
 func (s *Session) Close() error {
 	s.Orphan()
-	_, err := s.client.Revoke(context.TODO(), s.id)
+	_, err := s.client.Revoke(s.client.Ctx(), s.id)
 	return err
 }

+ 2 - 2
etcdctlv3/command/elect_command.go

@@ -64,7 +64,7 @@ func electCommandFunc(cmd *cobra.Command, args []string) {
 }
 
 func observe(c *clientv3.Client, election string) error {
-	e := concurrency.NewElection(context.TODO(), c, election)
+	e := concurrency.NewElection(c, election)
 	ctx, cancel := context.WithCancel(context.TODO())
 
 	donec := make(chan struct{})
@@ -94,7 +94,7 @@ func observe(c *clientv3.Client, election string) error {
 }
 
 func campaign(c *clientv3.Client, election string, prop string) error {
-	e := concurrency.NewElection(context.TODO(), c, election)
+	e := concurrency.NewElection(c, election)
 	ctx, cancel := context.WithCancel(context.TODO())
 
 	donec := make(chan struct{})

+ 1 - 1
etcdctlv3/command/lock_command.go

@@ -46,7 +46,7 @@ func lockCommandFunc(cmd *cobra.Command, args []string) {
 }
 
 func lockUntilSignal(c *clientv3.Client, lockname string) error {
-	m := concurrency.NewMutex(context.TODO(), c, lockname)
+	m := concurrency.NewMutex(c, lockname)
 	ctx, cancel := context.WithCancel(context.TODO())
 
 	// unlock in case of ordinary shutdown

+ 5 - 5
integration/v3_election_test.go

@@ -40,7 +40,7 @@ func TestElectionWait(t *testing.T) {
 		nextc = append(nextc, make(chan struct{}))
 		go func(ch chan struct{}) {
 			for j := 0; j < leaders; j++ {
-				b := concurrency.NewElection(context.TODO(), clus.RandClient(), "test-election")
+				b := concurrency.NewElection(clus.RandClient(), "test-election")
 				cctx, cancel := context.WithCancel(context.TODO())
 				defer cancel()
 				s, ok := <-b.Observe(cctx)
@@ -58,7 +58,7 @@ func TestElectionWait(t *testing.T) {
 	// elect some leaders
 	for i := 0; i < leaders; i++ {
 		go func() {
-			e := concurrency.NewElection(context.TODO(), clus.RandClient(), "test-election")
+			e := concurrency.NewElection(clus.RandClient(), "test-election")
 			ev := fmt.Sprintf("electval-%v", time.Now().UnixNano())
 			if err := e.Campaign(context.TODO(), ev); err != nil {
 				t.Fatalf("failed volunteer (%v)", err)
@@ -97,7 +97,7 @@ func TestElectionFailover(t *testing.T) {
 	defer cancel()
 
 	// first leader (elected)
-	e := concurrency.NewElection(context.TODO(), clus.clients[0], "test-election")
+	e := concurrency.NewElection(clus.clients[0], "test-election")
 	if err := e.Campaign(context.TODO(), "foo"); err != nil {
 		t.Fatalf("failed volunteer (%v)", err)
 	}
@@ -115,7 +115,7 @@ func TestElectionFailover(t *testing.T) {
 	// next leader
 	electedc := make(chan struct{})
 	go func() {
-		ee := concurrency.NewElection(context.TODO(), clus.clients[1], "test-election")
+		ee := concurrency.NewElection(clus.clients[1], "test-election")
 		if eer := ee.Campaign(context.TODO(), "bar"); eer != nil {
 			t.Fatal(eer)
 		}
@@ -132,7 +132,7 @@ func TestElectionFailover(t *testing.T) {
 	}
 
 	// check new leader
-	e = concurrency.NewElection(context.TODO(), clus.clients[2], "test-election")
+	e = concurrency.NewElection(clus.clients[2], "test-election")
 	resp, ok = <-e.Observe(cctx)
 	if !ok {
 		t.Fatalf("could not wait for second election; channel closed")

+ 1 - 1
integration/v3_lock_test.go

@@ -41,7 +41,7 @@ func testMutex(t *testing.T, waiters int, chooseClient func() *clientv3.Client)
 	lockedC := make(chan *concurrency.Mutex, 1)
 	for i := 0; i < waiters; i++ {
 		go func() {
-			m := concurrency.NewMutex(context.TODO(), chooseClient(), "test-mutex")
+			m := concurrency.NewMutex(chooseClient(), "test-mutex")
 			if err := m.Lock(context.TODO()); err != nil {
 				t.Fatalf("could not wait on lock (%v)", err)
 			}