Browse Source

storage: send compaction

Xiang Li 10 years ago
parent
commit
52416fafb0
3 changed files with 48 additions and 3 deletions
  1. 6 3
      storage/watchable_store.go
  2. 39 0
      storage/watchable_store_test.go
  3. 3 0
      storage/watcher.go

+ 6 - 3
storage/watchable_store.go

@@ -299,9 +299,12 @@ func (s *watchableStore) syncWatchers() {
 			}
 			}
 
 
 			if w.cur < compactionRev {
 			if w.cur < compactionRev {
-				// TODO: return error compacted to that watcher instead of
-				// just removing it silently from unsynced.
-				s.unsynced.delete(w)
+				select {
+				case w.ch <- WatchResponse{WatchID: w.id, Compacted: true}:
+					s.unsynced.delete(w)
+				default:
+					// retry next time
+				}
 				continue
 				continue
 			}
 			}
 
 

+ 39 - 0
storage/watchable_store_test.go

@@ -19,6 +19,7 @@ import (
 	"os"
 	"os"
 	"reflect"
 	"reflect"
 	"testing"
 	"testing"
+	"time"
 
 
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/lease"
 	"github.com/coreos/etcd/storage/backend"
 	"github.com/coreos/etcd/storage/backend"
@@ -215,6 +216,44 @@ func TestSyncWatchers(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// TestWatchCompacted tests a watcher that watches on a compacted revision.
+func TestWatchCompacted(t *testing.T) {
+	b, tmpPath := backend.NewDefaultTmpBackend()
+	s := newWatchableStore(b, &lease.FakeLessor{})
+
+	defer func() {
+		s.store.Close()
+		os.Remove(tmpPath)
+	}()
+	testKey := []byte("foo")
+	testValue := []byte("bar")
+
+	maxRev := 10
+	compactRev := int64(5)
+	for i := 0; i < maxRev; i++ {
+		s.Put(testKey, testValue, lease.NoLease)
+	}
+	err := s.Compact(compactRev)
+	if err != nil {
+		t.Fatalf("failed to compact kv (%v)", err)
+	}
+
+	w := s.NewWatchStream()
+	wt := w.Watch(testKey, true, compactRev-1)
+
+	select {
+	case resp := <-w.Chan():
+		if resp.WatchID != wt {
+			t.Errorf("resp.WatchID = %x, want %x", resp.WatchID, wt)
+		}
+		if resp.Compacted != true {
+			t.Errorf("resp.Compacted = %v, want %v", resp.Compacted, true)
+		}
+	case <-time.After(1 * time.Second):
+		t.Fatalf("failed to receive response (timeout)")
+	}
+}
+
 func TestNewMapwatcherToEventMap(t *testing.T) {
 func TestNewMapwatcherToEventMap(t *testing.T) {
 	k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2")
 	k0, k1, k2 := []byte("foo0"), []byte("foo1"), []byte("foo2")
 	v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2")
 	v0, v1, v2 := []byte("bar0"), []byte("bar1"), []byte("bar2")

+ 3 - 0
storage/watcher.go

@@ -67,6 +67,9 @@ type WatchResponse struct {
 	// watcher, the revision is greater than the last modified revision
 	// watcher, the revision is greater than the last modified revision
 	// inside Events.
 	// inside Events.
 	Revision int64
 	Revision int64
+
+	// Compacted is set when the watcher is cancelled due to compaction.
+	Compacted bool
 }
 }
 
 
 // watchStream contains a collection of watchers that share
 // watchStream contains a collection of watchers that share