Browse Source

Merge pull request #4395 from xiang90/fix_watch

fix watch issues in storage
Xiang Li 10 years ago
parent
commit
6c7ff98b0d
4 changed files with 56 additions and 15 deletions
  1. 4 3
      etcdserver/api/v3rpc/watch.go
  2. 10 12
      storage/watchable_store.go
  3. 39 0
      storage/watchable_store_test.go
  4. 3 0
      storage/watcher.go

+ 4 - 3
etcdserver/api/v3rpc/watch.go

@@ -146,9 +146,10 @@ func (sws *serverWatchStream) sendLoop() {
 			}
 			}
 
 
 			err := sws.gRPCStream.Send(&pb.WatchResponse{
 			err := sws.gRPCStream.Send(&pb.WatchResponse{
-				Header:  sws.newResponseHeader(wresp.Revision),
-				WatchId: int64(wresp.WatchID),
-				Events:  events,
+				Header:    sws.newResponseHeader(wresp.Revision),
+				WatchId:   int64(wresp.WatchID),
+				Events:    events,
+				Compacted: wresp.Compacted,
 			})
 			})
 			storage.ReportEventReceived()
 			storage.ReportEventReceived()
 			if err != nil {
 			if err != nil {

+ 10 - 12
storage/watchable_store.go

@@ -131,7 +131,7 @@ func (s *watchableStore) Put(key, value []byte, lease lease.LeaseID) (rev int64)
 		Type: storagepb.PUT,
 		Type: storagepb.PUT,
 		Kv:   &changes[0],
 		Kv:   &changes[0],
 	}
 	}
-	s.handle(rev, []storagepb.Event{ev})
+	s.notify(rev, []storagepb.Event{ev})
 	return rev
 	return rev
 }
 }
 
 
@@ -156,7 +156,7 @@ func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
 			Type: storagepb.DELETE,
 			Type: storagepb.DELETE,
 			Kv:   &change}
 			Kv:   &change}
 	}
 	}
-	s.handle(rev, evs)
+	s.notify(rev, evs)
 	return n, rev
 	return n, rev
 }
 }
 
 
@@ -191,7 +191,7 @@ func (s *watchableStore) TxnEnd(txnID int64) error {
 		}
 		}
 	}
 	}
 
 
-	s.handle(s.store.Rev(), evs)
+	s.notify(s.store.Rev(), evs)
 	s.mu.Unlock()
 	s.mu.Unlock()
 
 
 	return nil
 	return nil
@@ -299,9 +299,12 @@ func (s *watchableStore) syncWatchers() {
 			}
 			}
 
 
 			if w.cur < compactionRev {
 			if w.cur < compactionRev {
-				// TODO: return error compacted to that watcher instead of
-				// just removing it silently from unsynced.
-				s.unsynced.delete(w)
+				select {
+				case w.ch <- WatchResponse{WatchID: w.id, Compacted: true}:
+					s.unsynced.delete(w)
+				default:
+					// retry next time
+				}
 				continue
 				continue
 			}
 			}
 
 
@@ -324,7 +327,6 @@ func (s *watchableStore) syncWatchers() {
 	tx := s.store.b.BatchTx()
 	tx := s.store.b.BatchTx()
 	tx.Lock()
 	tx.Lock()
 	ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
 	ks, vs := tx.UnsafeRange(keyBucketName, minBytes, maxBytes, 0)
-	tx.Unlock()
 
 
 	evs := []storagepb.Event{}
 	evs := []storagepb.Event{}
 
 
@@ -351,6 +353,7 @@ func (s *watchableStore) syncWatchers() {
 
 
 		evs = append(evs, ev)
 		evs = append(evs, ev)
 	}
 	}
+	tx.Unlock()
 
 
 	for w, es := range newWatcherToEventMap(s.unsynced, evs) {
 	for w, es := range newWatcherToEventMap(s.unsynced, evs) {
 		select {
 		select {
@@ -370,11 +373,6 @@ func (s *watchableStore) syncWatchers() {
 	slowWatcherGauge.Set(float64(len(s.unsynced)))
 	slowWatcherGauge.Set(float64(len(s.unsynced)))
 }
 }
 
 
-// handle handles the change of the happening event on all watchers.
-func (s *watchableStore) handle(rev int64, evs []storagepb.Event) {
-	s.notify(rev, evs)
-}
-
 // notify notifies the fact that given event at the given rev just happened to
 // notify notifies the fact that given event at the given rev just happened to
 // watchers that watch on the key of the event.
 // watchers that watch on the key of the event.
 func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {
 func (s *watchableStore) notify(rev int64, evs []storagepb.Event) {

+ 39 - 0
storage/watchable_store_test.go

@@ -19,6 +19,7 @@ import (
 	"os"
 	"os"
 	"reflect"
 	"reflect"
 	"testing"
 	"testing"
+	"time"
 
 
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/storage/backend"
 	"github.com/coreos/etcd/storage/backend"
@@ -215,6 +216,44 @@ func TestSyncWatchers(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestWatchCompacted tests a watcher that watches on a compacted revision.
+func TestWatchCompacted(t *testing.T) {
+	b, tmpPath := backend.NewDefaultTmpBackend()
+	s := newWatchableStore(b, &lease.FakeLessor{})
+
+	defer func() {
+		s.store.Close()
+		os.Remove(tmpPath)
+	}()
+	testKey := []byte("foo")
+	testValue := []byte("bar")
+
+	maxRev := 10
+	compactRev := int64(5)
+	for i := 0; i < maxRev; i++ {
+		s.Put(testKey, testValue, lease.NoLease)
+	}
+	err := s.Compact(compactRev)
+	if err != nil {
+		t.Fatalf("failed to compact kv (%v)", err)
+	}
+
+	w := s.NewWatchStream()
+	wt := w.Watch(testKey, true, compactRev-1)
+
+	select {
+	case resp := <-w.Chan():
+		if resp.WatchID != wt {
+			t.Errorf("resp.WatchID = %x, want %x", resp.WatchID, wt)
+		}
+		if resp.Compacted != true {
+			t.Errorf("resp.Compacted = %v, want %v", resp.Compacted, true)
+		}
+	case <-time.After(1 * time.Second):
+		t.Fatalf("failed to receive response (timeout)")
+	}
+}
+
 func TestNewMapwatcherToEventMap(t *testing.T) {
 func TestNewMapwatcherToEventMap(t *testing.T) {
 	k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2")
 	k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2")
 	v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2")
 	v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2")

+ 3 - 0
storage/watcher.go

@@ -67,6 +67,9 @@ type WatchResponse struct {
 	// watcher, the revision is greater than the last modified revision
 	// watcher, the revision is greater than the last modified revision
 	// inside Events.
 	// inside Events.
 	Revision int64
 	Revision int64
+
+	// Compacted is set when the watcher is cancelled due to compaction.
+	Compacted bool
 }
 }
 
 
 // watchStream contains a collection of watchers that share
 // watchStream contains a collection of watchers that share