Browse Source

storage: introduce WatchableKV and watch feature

WatchableKV is an interface upon KV, and supports watch feature.
Yicheng Qin 10 years ago
parent
commit
ec43e0a4c3

+ 34 - 0
storage/index.go

@@ -2,6 +2,7 @@ package storage
 
 import (
 	"log"
+	"sort"
 	"sync"
 
 	"github.com/coreos/etcd/Godeps/_workspace/src/github.com/google/btree"
@@ -13,6 +14,7 @@ type index interface {
 	Put(key []byte, rev revision)
 	Restore(key []byte, created, modified revision, ver int64)
 	Tombstone(key []byte, rev revision) error
+	RangeEvents(key, end []byte, rev int64) []revision
 	Compact(rev int64) map[revision]struct{}
 	Equal(b index) bool
 }
@@ -118,6 +120,38 @@ func (ti *treeIndex) Tombstone(key []byte, rev revision) error {
 	return ki.tombstone(rev.main, rev.sub)
 }
 
+// RangeEvents returns all revisions from key(including) to end(excluding)
+// at or after the given rev. The returned slice is sorted in the order
+// of revision.
+func (ti *treeIndex) RangeEvents(key, end []byte, rev int64) []revision {
+	ti.RLock()
+	defer ti.RUnlock()
+
+	keyi := &keyIndex{key: key}
+	if end == nil {
+		item := ti.tree.Get(keyi)
+		if item == nil {
+			return nil
+		}
+		keyi = item.(*keyIndex)
+		return keyi.since(rev)
+	}
+
+	endi := &keyIndex{key: end}
+	var revs []revision
+	ti.tree.AscendGreaterOrEqual(keyi, func(item btree.Item) bool {
+		if !item.Less(endi) {
+			return false
+		}
+		curKeyi := item.(*keyIndex)
+		revs = append(revs, curKeyi.since(rev)...)
+		return true
+	})
+	sort.Sort(revisions(revs))
+
+	return revs
+}
+
 func (ti *treeIndex) Compact(rev int64) map[revision]struct{} {
 	available := make(map[revision]struct{})
 	emptyki := make([]*keyIndex, 0)

+ 35 - 0
storage/key_index.go

@@ -132,6 +132,41 @@ func (ki *keyIndex) get(atRev int64) (modified, created revision, ver int64, err
 	return revision{}, revision{}, 0, ErrRevisionNotFound
 }
 
+// since returns revisions since the give rev. Only the revision with the
+// largest sub revision will be returned if multiple revisions have the same
+// main revision.
+func (ki *keyIndex) since(rev int64) []revision {
+	if ki.isEmpty() {
+		log.Panicf("store.keyindex: unexpected get on empty keyIndex %s", string(ki.key))
+	}
+	since := revision{rev, 0}
+	var gi int
+	// find the generations to start checking
+	for gi = len(ki.generations) - 1; gi > 0; gi-- {
+		if since.GreaterThan(ki.generations[gi].created) {
+			break
+		}
+	}
+
+	var revs []revision
+	var last int64
+	for ; gi < len(ki.generations); gi++ {
+		for _, r := range ki.generations[gi].revs {
+			if since.GreaterThan(r) {
+				continue
+			}
+			if r.main == last {
+				// replace the revision with a new one that has higher sub value,
+				// because the original one should not be seen by external
+				revs[len(revs)-1] = r
+			}
+			revs = append(revs, r)
+			last = r.main
+		}
+	}
+	return revs
+}
+
 // compact compacts a keyIndex by removing the versions with smaller or equal
 // revision than the given atRev except the largest one (If the largest one is
 // a tombstone, it will not be kept).

+ 35 - 0
storage/kv.go

@@ -6,6 +6,10 @@ import (
 	"github.com/coreos/etcd/storage/storagepb"
 )
 
+// CancelFunc tells an operation to abandon its work. A CancelFunc does not
+// wait for the work to stop.
+type CancelFunc func()
+
 type KV interface {
 	// Range gets the keys in the range at rangeRev.
 	// If rangeRev <=0, range gets the keys at currentRev.
@@ -46,3 +50,34 @@ type KV interface {
 	Restore() error
 	Close() error
 }
+
+// Watcher watches on the KV. It will be notified if there is an event
+// happened on the watched key or prefix.
+type Watcher interface {
+	// Event returns a channel that receives observed event that matches the
+	// context of watcher. When watch finishes or is canceled or aborted, the
+	// channel is closed and returns empty event.
+	// Successive calls to Event return the same value.
+	Event() <-chan storagepb.Event
+
+	// Err returns a non-nil error value after Event is closed. Err returns
+	// Compacted if the history was compacted, Canceled if watch is canceled,
+	// or EOF if watch reaches the end revision. No other values for Err are defined.
+	// After Event is closed, successive calls to Err return the same value.
+	Err() error
+}
+
+// WatchableKV is a KV that can be watched.
+type WatchableKV interface {
+	KV
+
+	// Watcher watches the events happening or happened in etcd. The whole
+	// event history can be watched unless compacted.
+	// If `prefix` is true, watch observes all events whose key prefix could be the given `key`.
+	// If `startRev` <=0, watch observes events after currentRev.
+	// If `endRev` <=0, watch observes events until watch is cancelled.
+	//
+	// Canceling the watcher releases resources associated with it, so code
+	// should always call cancel as soon as watch is done.
+	Watcher(key []byte, prefix bool, startRev, endRev int64) (Watcher, CancelFunc)
+}

+ 103 - 0
storage/kv_test.go

@@ -691,6 +691,109 @@ func TestKVSnapshot(t *testing.T) {
 	}
 }
 
+func TestWatchableKVWatch(t *testing.T) {
+	s := newWatchableStore(tmpPath)
+	defer cleanup(s, tmpPath)
+
+	wa, cancel := s.Watcher([]byte("foo"), true, 0, 0)
+	defer cancel()
+
+	s.Put([]byte("foo"), []byte("bar"))
+	select {
+	case ev := <-wa.Event():
+		wev := storagepb.Event{
+			Type: storagepb.PUT,
+			Kv: &storagepb.KeyValue{
+				Key:            []byte("foo"),
+				Value:          []byte("bar"),
+				CreateRevision: 1,
+				ModRevision:    1,
+				Version:        1,
+			},
+		}
+		if !reflect.DeepEqual(ev, wev) {
+			t.Errorf("watched event = %+v, want %+v", ev, wev)
+		}
+	case <-time.After(time.Second):
+		t.Fatalf("failed to watch the event")
+	}
+
+	s.Put([]byte("foo1"), []byte("bar1"))
+	select {
+	case ev := <-wa.Event():
+		wev := storagepb.Event{
+			Type: storagepb.PUT,
+			Kv: &storagepb.KeyValue{
+				Key:            []byte("foo1"),
+				Value:          []byte("bar1"),
+				CreateRevision: 2,
+				ModRevision:    2,
+				Version:        1,
+			},
+		}
+		if !reflect.DeepEqual(ev, wev) {
+			t.Errorf("watched event = %+v, want %+v", ev, wev)
+		}
+	case <-time.After(time.Second):
+		t.Fatalf("failed to watch the event")
+	}
+
+	wa, cancel = s.Watcher([]byte("foo1"), false, 1, 4)
+	defer cancel()
+
+	select {
+	case ev := <-wa.Event():
+		wev := storagepb.Event{
+			Type: storagepb.PUT,
+			Kv: &storagepb.KeyValue{
+				Key:            []byte("foo1"),
+				Value:          []byte("bar1"),
+				CreateRevision: 2,
+				ModRevision:    2,
+				Version:        1,
+			},
+		}
+		if !reflect.DeepEqual(ev, wev) {
+			t.Errorf("watched event = %+v, want %+v", ev, wev)
+		}
+	case <-time.After(time.Second):
+		t.Fatalf("failed to watch the event")
+	}
+
+	s.Put([]byte("foo1"), []byte("bar11"))
+	select {
+	case ev := <-wa.Event():
+		wev := storagepb.Event{
+			Type: storagepb.PUT,
+			Kv: &storagepb.KeyValue{
+				Key:            []byte("foo1"),
+				Value:          []byte("bar11"),
+				CreateRevision: 2,
+				ModRevision:    3,
+				Version:        2,
+			},
+		}
+		if !reflect.DeepEqual(ev, wev) {
+			t.Errorf("watched event = %+v, want %+v", ev, wev)
+		}
+	case <-time.After(time.Second):
+		t.Fatalf("failed to watch the event")
+	}
+
+	select {
+	case ev := <-wa.Event():
+		if !reflect.DeepEqual(ev, storagepb.Event{}) {
+			t.Errorf("watched event = %+v, want %+v", ev, storagepb.Event{})
+		}
+		if g := wa.Err(); g != ExceedEnd {
+			t.Errorf("err = %+v, want %+v", g, ExceedEnd)
+		}
+	case <-time.After(time.Second):
+		t.Fatalf("failed to watch the event")
+	}
+
+}
+
 func cleanup(s KV, path string) {
 	s.Close()
 	os.Remove(path)

+ 49 - 0
storage/kvstore.go

@@ -25,6 +25,7 @@ var (
 	ErrTxnIDMismatch = errors.New("storage: txn id mismatch")
 	ErrCompacted     = errors.New("storage: required revision has been compacted")
 	ErrFutureRev     = errors.New("storage: required revision is a future revision")
+	ErrCanceled      = errors.New("storage: watcher is canceled")
 )
 
 type store struct {
@@ -170,6 +171,54 @@ func (s *store) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err
 	return n, rev, nil
 }
 
+// RangeEvents gets the events from key to end at or after rangeRev.
+// If rangeRev <=0, rangeEvents returns events from the beginning of the history.
+// If `end` is nil, the request only observes the events on key.
+// If `end` is not nil, it observes the events on key range [key, range_end).
+// Limit limits the number of events returned.
+// If the required rev is compacted, ErrCompacted will be returned.
+// TODO: return byte slices instead of events to avoid meaningless encode and decode.
+func (s *store) RangeEvents(key, end []byte, limit, startRev, endRev int64) (evs []storagepb.Event, nextRev int64, err error) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	if startRev <= s.compactMainRev {
+		return nil, 0, ErrCompacted
+	}
+
+	revs := s.kvindex.RangeEvents(key, end, startRev)
+	if len(revs) == 0 {
+		return nil, s.currentRev.main + 1, nil
+	}
+
+	tx := s.b.BatchTx()
+	tx.Lock()
+	defer tx.Unlock()
+	// fetch events from the backend using revisions
+	for _, rev := range revs {
+		if rev.main >= endRev {
+			return evs, rev.main, nil
+		}
+		revbytes := newRevBytes()
+		revToBytes(rev, revbytes)
+
+		_, vs := tx.UnsafeRange(keyBucketName, revbytes, nil, 0)
+		if len(vs) != 1 {
+			log.Fatalf("storage: range cannot find rev (%d,%d)", rev.main, rev.sub)
+		}
+
+		e := storagepb.Event{}
+		if err := e.Unmarshal(vs[0]); err != nil {
+			log.Fatalf("storage: cannot unmarshal event: %v", err)
+		}
+		evs = append(evs, e)
+		if limit > 0 && len(evs) >= int(limit) {
+			return evs, rev.main + 1, nil
+		}
+	}
+	return evs, s.currentRev.main + 1, nil
+}
+
 func (s *store) Compact(rev int64) error {
 	s.mu.Lock()
 	defer s.mu.Unlock()

+ 3 - 0
storage/kvstore_test.go

@@ -473,6 +473,9 @@ func (i *fakeIndex) Tombstone(key []byte, rev revision) error {
 	i.Recorder.Record(testutil.Action{Name: "tombstone", Params: []interface{}{key, rev}})
 	return nil
 }
+func (i *fakeIndex) RangeEvents(key, end []byte, rev int64) []revision {
+	return nil
+}
 func (i *fakeIndex) Compact(rev int64) map[revision]struct{} {
 	i.Recorder.Record(testutil.Action{Name: "compact", Params: []interface{}{rev}})
 	return <-i.indexCompactRespc

+ 6 - 0
storage/revision.go

@@ -33,3 +33,9 @@ func bytesToRev(bytes []byte) revision {
 		sub:  int64(binary.BigEndian.Uint64(bytes[9:])),
 	}
 }
+
+type revisions []revision
+
+func (a revisions) Len() int           { return len(a) }
+func (a revisions) Less(i, j int) bool { return a[j].GreaterThan(a[i]) }
+func (a revisions) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }

+ 336 - 0
storage/watchable_store.go

@@ -0,0 +1,336 @@
+package storage
+
+import (
+	"errors"
+	"log"
+	"sync"
+	"time"
+
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+// ReachEnd is the error returned by Watcher.Err when watcher reaches its end revision and
+// no more event is available.
+var ExceedEnd = errors.New("storage: watcher reaches end revision")
+
+type watchableStore struct {
+	mu sync.Mutex
+
+	KV
+
+	// contains all unsynced watchers that needs to sync events that have happened
+	// TODO: use map to reduce cancel cost
+	unsynced []*watcher
+	// contains all synced watchers that are tracking the events that will happen
+	// The key of the map is the key that the watcher is watching on.
+	synced map[string][]*watcher
+	// contains all synced watchers that have an end revision
+	// The key of the map is the end revision of the watcher.
+	endm map[int64][]*watcher
+	tx   *ongoingTx
+
+	stopc chan struct{}
+	wg    sync.WaitGroup
+}
+
+func newWatchableStore(path string) *watchableStore {
+	s := &watchableStore{
+		KV:     newStore(path),
+		synced: make(map[string][]*watcher),
+		endm:   make(map[int64][]*watcher),
+		stopc:  make(chan struct{}),
+	}
+	s.wg.Add(1)
+	go s.syncWatchersLoop()
+	return s
+}
+
+func (s *watchableStore) Put(key, value []byte) (rev int64) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	rev = s.KV.Put(key, value)
+	// TODO: avoid this range
+	kvs, _, err := s.KV.Range(key, nil, 0, rev)
+	if err != nil {
+		log.Panicf("unexpected range error (%v)", err)
+	}
+	s.handle(rev, storagepb.Event{
+		Type: storagepb.PUT,
+		Kv:   &kvs[0],
+	})
+	return rev
+}
+
+func (s *watchableStore) DeleteRange(key, end []byte) (n, rev int64) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	// TODO: avoid this range
+	kvs, _, err := s.KV.Range(key, end, 0, 0)
+	if err != nil {
+		log.Panicf("unexpected range error (%v)", err)
+	}
+	n, rev = s.KV.DeleteRange(key, end)
+	for _, kv := range kvs {
+		s.handle(rev, storagepb.Event{
+			Type: storagepb.DELETE,
+			Kv: &storagepb.KeyValue{
+				Key: kv.Key,
+			},
+		})
+	}
+	return n, rev
+}
+
+func (s *watchableStore) TxnBegin() int64 {
+	s.mu.Lock()
+	s.tx = newOngoingTx()
+	return s.KV.TxnBegin()
+}
+
+func (s *watchableStore) TxnPut(txnID int64, key, value []byte) (rev int64, err error) {
+	rev, err = s.KV.TxnPut(txnID, key, value)
+	if err == nil {
+		s.tx.put(string(key))
+	}
+	return rev, err
+}
+
+func (s *watchableStore) TxnDeleteRange(txnID int64, key, end []byte) (n, rev int64, err error) {
+	kvs, _, err := s.KV.TxnRange(txnID, key, end, 0, 0)
+	if err != nil {
+		log.Panicf("unexpected range error (%v)", err)
+	}
+	n, rev, err = s.KV.TxnDeleteRange(txnID, key, end)
+	if err == nil {
+		for _, kv := range kvs {
+			s.tx.del(string(kv.Key))
+		}
+	}
+	return n, rev, err
+}
+
+func (s *watchableStore) TxnEnd(txnID int64) error {
+	err := s.KV.TxnEnd(txnID)
+	if err != nil {
+		return err
+	}
+
+	_, rev, _ := s.KV.Range(nil, nil, 0, 0)
+	for k := range s.tx.putm {
+		kvs, _, err := s.KV.Range([]byte(k), nil, 0, 0)
+		if err != nil {
+			log.Panicf("unexpected range error (%v)", err)
+		}
+		s.handle(rev, storagepb.Event{
+			Type: storagepb.PUT,
+			Kv:   &kvs[0],
+		})
+	}
+	for k := range s.tx.delm {
+		s.handle(rev, storagepb.Event{
+			Type: storagepb.DELETE,
+			Kv: &storagepb.KeyValue{
+				Key: []byte(k),
+			},
+		})
+	}
+	s.mu.Unlock()
+	return nil
+}
+
+func (s *watchableStore) Close() error {
+	close(s.stopc)
+	s.wg.Wait()
+	return s.KV.Close()
+}
+
+func (s *watchableStore) Watcher(key []byte, prefix bool, startRev, endRev int64) (Watcher, CancelFunc) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	wa := newWatcher(key, prefix, startRev, endRev)
+	k := string(key)
+	if startRev == 0 {
+		s.synced[k] = append(s.synced[k], wa)
+		if endRev != 0 {
+			s.endm[endRev] = append(s.endm[endRev], wa)
+		}
+	} else {
+		s.unsynced = append(s.unsynced, wa)
+	}
+
+	cancel := CancelFunc(func() {
+		s.mu.Lock()
+		s.mu.Unlock()
+		wa.stopWithError(ErrCanceled)
+
+		// remove global references of the watcher
+		for i, w := range s.unsynced {
+			if w == wa {
+				s.unsynced = append(s.unsynced[:i], s.unsynced[i+1:]...)
+				return
+			}
+		}
+
+		for i, w := range s.synced[k] {
+			if w == wa {
+				s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
+			}
+		}
+		if wa.end != 0 {
+			for i, w := range s.endm[wa.end] {
+				if w == wa {
+					s.endm[wa.end] = append(s.endm[wa.end][:i], s.endm[wa.end][i+1:]...)
+				}
+			}
+		}
+		// If we cannot find it, it should have finished watch.
+	})
+
+	return wa, cancel
+}
+
+// keepSyncWatchers syncs the watchers in the unsyncd map every 100ms.
+func (s *watchableStore) syncWatchersLoop() {
+	defer s.wg.Done()
+
+	for {
+		s.mu.Lock()
+		s.syncWatchers()
+		s.mu.Unlock()
+
+		select {
+		case <-time.After(100 * time.Millisecond):
+		case <-s.stopc:
+			return
+		}
+	}
+}
+
+// syncWatchers syncs the watchers in the unsyncd map.
+func (s *watchableStore) syncWatchers() {
+	_, curRev, _ := s.KV.Range(nil, nil, 0, 0)
+
+	// filtering without allocating
+	// https://github.com/golang/go/wiki/SliceTricks#filtering-without-allocating
+	nws := s.unsynced[:0]
+	for _, w := range s.unsynced {
+		var end []byte
+		if w.prefix {
+			end = make([]byte, len(w.key))
+			copy(end, w.key)
+			end[len(w.key)-1]++
+		}
+		limit := cap(w.ch) - len(w.ch)
+		// the channel is full, try it in the next round
+		if limit == 0 {
+			nws = append(nws, w)
+			continue
+		}
+		evs, nextRev, err := s.KV.(*store).RangeEvents(w.key, end, int64(limit), w.cur, w.end)
+		if err != nil {
+			w.stopWithError(err)
+			continue
+		}
+
+		// push events to the channel
+		for _, ev := range evs {
+			w.ch <- ev
+		}
+		// stop watcher if it reaches the end
+		if w.end > 0 && nextRev >= w.end {
+			w.stopWithError(ExceedEnd)
+			continue
+		}
+		// switch to tracking future events if needed
+		if nextRev > curRev {
+			s.synced[string(w.key)] = append(s.synced[string(w.key)], w)
+			if w.end != 0 {
+				s.endm[w.end] = append(s.endm[w.end], w)
+			}
+			continue
+		}
+		// put it back to try it in the next round
+		w.cur = nextRev
+		nws = append(nws, w)
+	}
+	s.unsynced = nws
+}
+
+// handle handles the change of the happening event on all watchers.
+func (s *watchableStore) handle(rev int64, ev storagepb.Event) {
+	s.notify(rev, ev)
+	s.stopWatchers(rev)
+}
+
+// notify notifies the fact that given event at the given rev just happened to
+// watchers that watch on the key of the event.
+func (s *watchableStore) notify(rev int64, ev storagepb.Event) {
+	// check all prefixes of the key to notify all corresponded watchers
+	for i := 0; i <= len(ev.Kv.Key); i++ {
+		ws := s.synced[string(ev.Kv.Key[:i])]
+		nws := ws[:0]
+		for _, w := range ws {
+			// the watcher needs to be notified when either it watches prefix or
+			// the key is exactly matched.
+			if !w.prefix && i != len(ev.Kv.Key) {
+				continue
+			}
+			select {
+			case w.ch <- ev:
+				nws = append(nws, w)
+			default:
+				// put it back to unsynced place
+				if w.end != 0 {
+					for i, ew := range s.endm[w.end] {
+						if ew == w {
+							s.endm[w.end] = append(s.endm[w.end][:i], s.endm[w.end][i+1:]...)
+						}
+					}
+				}
+				w.cur = rev
+				s.unsynced = append(s.unsynced, w)
+			}
+		}
+		s.synced[string(ev.Kv.Key[:i])] = nws
+	}
+}
+
+// stopWatchers stops watchers with limit equal to rev.
+func (s *watchableStore) stopWatchers(rev int64) {
+	for i, wa := range s.endm[rev+1] {
+		k := string(wa.key)
+		for _, w := range s.synced[k] {
+			if w == wa {
+				s.synced[k] = append(s.synced[k][:i], s.synced[k][i+1:]...)
+			}
+		}
+		wa.stopWithError(ExceedEnd)
+	}
+	delete(s.endm, rev+1)
+}
+
+type ongoingTx struct {
+	// keys put/deleted in the ongoing txn
+	putm map[string]bool
+	delm map[string]bool
+}
+
+func newOngoingTx() *ongoingTx {
+	return &ongoingTx{
+		putm: make(map[string]bool),
+		delm: make(map[string]bool),
+	}
+}
+
+func (tx *ongoingTx) put(k string) {
+	tx.putm[k] = true
+	tx.delm[k] = false
+}
+
+func (tx *ongoingTx) del(k string) {
+	tx.delm[k] = true
+	tx.putm[k] = false
+}

+ 46 - 0
storage/watcher.go

@@ -0,0 +1,46 @@
+package storage
+
+import (
+	"sync"
+
+	"github.com/coreos/etcd/storage/storagepb"
+)
+
+type watcher struct {
+	key    []byte
+	prefix bool
+	cur    int64
+	end    int64
+
+	ch  chan storagepb.Event
+	mu  sync.Mutex
+	err error
+}
+
+func newWatcher(key []byte, prefix bool, start, end int64) *watcher {
+	return &watcher{
+		key:    key,
+		prefix: prefix,
+		cur:    start,
+		end:    end,
+		ch:     make(chan storagepb.Event, 10),
+	}
+}
+
+func (w *watcher) Event() <-chan storagepb.Event { return w.ch }
+
+func (w *watcher) Err() error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	return w.err
+}
+
+func (w *watcher) stopWithError(err error) {
+	if w.err != nil {
+		return
+	}
+	close(w.ch)
+	w.mu.Lock()
+	w.err = err
+	w.mu.Unlock()
+}

+ 17 - 0
storage/watcher_bench_test.go

@@ -0,0 +1,17 @@
+package storage
+
+import (
+	"fmt"
+	"testing"
+)
+
+func BenchmarkKVWatcherMemoryUsage(b *testing.B) {
+	s := newWatchableStore(tmpPath)
+	defer cleanup(s, tmpPath)
+
+	b.ReportAllocs()
+	b.StartTimer()
+	for i := 0; i < b.N; i++ {
+		s.Watcher([]byte(fmt.Sprint("foo", i)), false, 0, 0)
+	}
+}