Browse Source

Merge pull request #4262 from heyitsanthony/v3-lease-watch-expire

storage: publish delete events on lease revocation
Anthony Romano 10 years ago
parent
commit
6c240b4037
2 changed files with 53 additions and 0 deletions
  1. 49 0
      integration/v3_grpc_test.go
  2. 4 0
      storage/watchable_store.go

+ 49 - 0
integration/v3_grpc_test.go

@@ -1057,6 +1057,55 @@ func TestV3LeaseCreateByID(t *testing.T) {
 
 }
 
+// TestV3LeaseExpire ensures a key is deleted once a key expires.
+func TestV3LeaseExpire(t *testing.T) {
+	testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {
+		// let lease lapse; wait for deleted key
+
+		wAPI := pb.NewWatchClient(clus.RandConn())
+		wStream, err := wAPI.Watch(context.TODO())
+		if err != nil {
+			return err
+		}
+
+		creq := &pb.WatchCreateRequest{Key: []byte("foo"), StartRevision: 1}
+		wreq := &pb.WatchRequest{CreateRequest: creq}
+		if err := wStream.Send(wreq); err != nil {
+			return err
+		}
+		if _, err := wStream.Recv(); err != nil {
+			// the 'created' message
+			return err
+		}
+		if _, err := wStream.Recv(); err != nil {
+			// the 'put' message
+			return err
+		}
+
+		errc := make(chan error, 1)
+		go func() {
+			resp, err := wStream.Recv()
+			switch {
+			case err != nil:
+				errc <- err
+			case len(resp.Events) != 1:
+				fallthrough
+			case resp.Events[0].Type != storagepb.DELETE:
+				errc <- fmt.Errorf("expected key delete, got %v", resp)
+			default:
+				errc <- nil
+			}
+		}()
+
+		select {
+		case <-time.After(15 * time.Second):
+			return fmt.Errorf("lease expiration too slow")
+		case err := <-errc:
+			return err
+		}
+	})
+}
+
 // TestV3LeaseKeepAlive ensures keepalive keeps the lease alive.
 func TestV3LeaseKeepAlive(t *testing.T) {
 	testLeaseRemoveLeasedKey(t, func(clus *clusterV3, leaseID int64) error {

+ 4 - 0
storage/watchable_store.go

@@ -68,6 +68,10 @@ func newWatchableStore(b backend.Backend, le lease.Lessor) *watchableStore {
 		synced:   make(map[string]map[*watcher]struct{}),
 		stopc:    make(chan struct{}),
 	}
+	if s.le != nil {
+		// use this store as the deleter so revokes trigger watch events
+		s.le.SetRangeDeleter(s)
+	}
 	s.wg.Add(1)
 	go s.syncWatchersLoop()
 	return s