Browse Source

fix race between Expire() and others, fix UpdateTTL(), modified watcher to catch Expire()

evan-gu 12 years ago
parent
commit
b8ac1d082b
8 changed files with 78 additions and 76 deletions
  1. 11 18
      store/event.go
  2. 6 6
      store/event_test.go
  3. 11 5
      store/node.go
  4. 12 0
      store/stats_test.go
  5. 3 2
      store/store.go
  6. 27 36
      store/store_test.go
  7. 1 1
      store/watcher.go
  8. 7 8
      store/watcher_test.go

+ 11 - 18
store/event.go

@@ -16,6 +16,8 @@ const (
 	Delete     = "delete"
 	TestAndSet = "testAndSet"
 	Expire     = "expire"
+	UndefIndex = 0
+	UndefTerm  = 0
 )
 
 type Event struct {
@@ -92,6 +94,7 @@ type EventHistory struct {
 	StartIndex uint64
 	LastIndex  uint64
 	LastTerm   uint64
+	DupIndex   uint64
 	rwl        sync.RWMutex
 }
 
@@ -109,12 +112,16 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
 	eh.rwl.Lock()
 	defer eh.rwl.Unlock()
 
-	if e.Index == 0 {
+	DupIndex := uint64(0)
+
+	if e.Index == UndefIndex {
 		e.Index = eh.LastIndex
+		DupIndex = 1
 	}
 
-	if e.Term == 0 {
+	if e.Term == UndefTerm {
 		e.Term = eh.LastTerm
+		DupIndex = 1
 	}
 
 	eh.Queue.insert(e)
@@ -123,32 +130,18 @@ func (eh *EventHistory) addEvent(e *Event) *Event {
 
 	eh.LastIndex = e.Index
 	eh.LastTerm = e.Term
+	eh.DupIndex += DupIndex
 
 	return e
 }
 
-// addEvent with the last event's index and term
-/*func (eh *EventHistory) addEventWithouIndex(action, key string) (e *Event) {
-	eh.rwl.Lock()
-	defer eh.rwl.Unlock()
-
-	LastEvent := eh.Queue.Events[eh.Queue.back()]
-	e = newEvent(action, key, LastEvent.Index, LastEvent.Term);
-
-	eh.Queue.insert(e)
-
-	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index
-
-	return e;
-}*/
-
 // scan function is enumerating events from the index in history and
 // stops till the first point where the key has identified prefix
 func (eh *EventHistory) scan(prefix string, index uint64) (*Event, error) {
 	eh.rwl.RLock()
 	defer eh.rwl.RUnlock()
 
-	start := index - eh.StartIndex
+	start := index - eh.StartIndex + eh.DupIndex
 
 	// the index should locate after the event history's StartIndex
 	// and before its size

+ 6 - 6
store/event_test.go

@@ -13,7 +13,7 @@ func TestEventQueue(t *testing.T) {
 
 	// Add
 	for i := 0; i < 200; i++ {
-		e := newEvent(Create, "/foo", uint64(i), 0)
+		e := newEvent(Create, "/foo", uint64(i), 1)
 		eh.addEvent(e)
 	}
 
@@ -37,11 +37,11 @@ func TestScanHistory(t *testing.T) {
 	eh := newEventHistory(100)
 
 	// Add
-	eh.addEvent(newEvent(Create, "/foo", 1, 0))
-	eh.addEvent(newEvent(Create, "/foo/bar", 2, 0))
-	eh.addEvent(newEvent(Create, "/foo/foo", 3, 0))
-	eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 0))
-	eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 0))
+	eh.addEvent(newEvent(Create, "/foo", 1, 1))
+	eh.addEvent(newEvent(Create, "/foo/bar", 2, 1))
+	eh.addEvent(newEvent(Create, "/foo/foo", 3, 1))
+	eh.addEvent(newEvent(Create, "/foo/bar/bar", 4, 1))
+	eh.addEvent(newEvent(Create, "/foo/foo/foo", 5, 1))
 
 	e, err := eh.scan("/foo", 1)
 	if err != nil || e.Index != 1 {

+ 11 - 5
store/node.go

@@ -66,6 +66,7 @@ func newDir(nodePath string, createIndex uint64, createTerm uint64, parent *Node
 // If the node is a directory and recursive is true, the function will recursively remove
 // add nodes under the receiver node.
 func (n *Node) Remove(recursive bool, callback func(path string)) error {
+
 	n.mu.Lock()
 	defer n.mu.Unlock()
 
@@ -87,6 +88,7 @@ func (n *Node) Remove(recursive bool, callback func(path string)) error {
 
 			n.stopExpire <- true
 			n.status = removed
+
 		}
 
 		return nil
@@ -265,14 +267,14 @@ func (n *Node) Expire(s *Store) {
 		select {
 		// if timeout, delete the node
 		case <-time.After(duration):
-			e := newEvent(Expire, n.Path, 0, 0)
+			s.worldLock.Lock()
 
+			e := newEvent(Expire, n.Path, UndefIndex, UndefTerm)
+			s.WatcherHub.notify(e)
 			n.Remove(true, nil)
-
 			s.Stats.Inc(ExpireCount)
 
-			s.WatcherHub.notify(e)
-
+			s.worldLock.Unlock()
 			return
 
 		// if stopped, return
@@ -364,7 +366,11 @@ func (n *Node) Pair(recurisive, sorted bool) KeyValuePair {
 
 func (n *Node) UpdateTTL(expireTime time.Time, s *Store) {
 	if !n.IsPermanent() {
-		n.stopExpire <- true // suspend it to modify the expiration
+		expired, _ := n.IsExpired()
+
+		if !expired {
+			n.stopExpire <- true // suspend it to modify the expiration
+		}
 	}
 
 	if expireTime.Sub(Permanent) != 0 {

+ 12 - 0
store/stats_test.go

@@ -24,6 +24,8 @@ func TestBasicStats(t *testing.T) {
 		}
 	}
 
+	//fmt.Println("create")
+
 	time.Sleep(time.Second * 3)
 
 	for _, k := range keys {
@@ -35,6 +37,8 @@ func TestBasicStats(t *testing.T) {
 		}
 	}
 
+	//fmt.Println("get")
+
 	for _, k := range keys {
 		i++
 		_, err := s.Update(k, "foo", time.Now().Add(time.Second*time.Duration(rand.Intn(6))), i, 1)
@@ -45,6 +49,8 @@ func TestBasicStats(t *testing.T) {
 		}
 	}
 
+	//fmt.Println("update")
+
 	time.Sleep(time.Second * 3)
 
 	for _, k := range keys {
@@ -66,11 +72,15 @@ func TestBasicStats(t *testing.T) {
 		}
 	}
 
+	//fmt.Println("get testAndSet")
+
 	for _, k := range keys {
 		s.Watch(k, false, 0, i, 1)
 		watcher_number++
 	}
 
+	//fmt.Println("watch")
+
 	for _, k := range keys {
 		_, err := s.Get(k, false, false, i, 1)
 		if err != nil {
@@ -91,6 +101,8 @@ func TestBasicStats(t *testing.T) {
 		}
 	}
 
+	//fmt.Println("get delete")
+
 	for _, k := range keys {
 		_, err := s.Get(k, false, false, i, 1)
 		if err != nil {

+ 3 - 2
store/store.go

@@ -164,6 +164,7 @@ func (s *Store) Create(nodePath string, value string, expireTime time.Time, inde
 // If the node is a file, the value and the ttl can be updated.
 // If the node is a directory, only the ttl can be updated.
 func (s *Store) Update(nodePath string, value string, expireTime time.Time, index uint64, term uint64) (*Event, error) {
+
 	s.worldLock.RLock()
 	defer s.worldLock.RUnlock()
 
@@ -171,15 +172,16 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde
 
 	if err != nil { // if the node does not exist, return error
 		s.Stats.Inc(UpdateFail)
+
 		return nil, err
 	}
 
 	e := newEvent(Update, nodePath, s.Index, s.Term)
 
 	if n.IsDir() { // if the node is a directory, we can only update ttl
-
 		if len(value) != 0 {
 			s.Stats.Inc(UpdateFail)
+
 			return nil, etcdErr.NewError(etcdErr.EcodeNotFile, nodePath)
 		}
 
@@ -195,7 +197,6 @@ func (s *Store) Update(nodePath string, value string, expireTime time.Time, inde
 
 	// update ttl
 	n.UpdateTTL(expireTime, s)
-
 	e.Expiration = &n.ExpireTime
 	e.TTL = int64(expireTime.Sub(time.Now())/time.Second) + 1
 

+ 27 - 36
store/store_test.go

@@ -320,88 +320,88 @@ func TestTestAndSet(t *testing.T) { // TODO prevValue == nil ?
 func TestWatch(t *testing.T) {
 	s := New()
 	// watch at a deeper path
-	c, _ := s.WatcherHub.watch("/foo/foo/foo", false, 0)
+	c, _ := s.Watch("/foo/foo/foo", false, 0, 0, 1)
 	s.Create("/foo/foo/foo", "bar", Permanent, 1, 1)
 
 	e := nonblockingRetrive(c)
-	if e.Key != "/foo/foo/foo" {
-		t.Fatal("watch for Create node fails")
+	if e.Key != "/foo/foo/foo" || e.Action != Create {
+		t.Fatal("watch for Create node fails ", e)
 	}
 
-	c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0)
+	c, _ = s.Watch("/foo/foo/foo", false, 0, 1, 1)
 	s.Update("/foo/foo/foo", "car", Permanent, 2, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/foo" {
-		t.Fatal("watch for Update node fails")
+	if e.Key != "/foo/foo/foo" || e.Action != Update {
+		t.Fatal("watch for Update node fails ", e)
 	}
 
-	c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0)
+	c, _ = s.Watch("/foo/foo/foo", false, 0, 2, 1)
 	s.TestAndSet("/foo/foo/foo", "car", 0, "bar", Permanent, 3, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/foo" {
+	if e.Key != "/foo/foo/foo" || e.Action != TestAndSet {
 		t.Fatal("watch for TestAndSet node fails")
 	}
 
-	c, _ = s.WatcherHub.watch("/foo/foo/foo", false, 0)
+	c, _ = s.Watch("/foo/foo/foo", false, 0, 3, 1)
 	s.Delete("/foo", true, 4, 1) //recursively delete
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo" {
-		t.Fatal("watch for Delete node fails")
+	if e.Key != "/foo" || e.Action != Delete {
+		t.Fatal("watch for Delete node fails ", e)
 	}
 
 	// watch at a prefix
-	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	c, _ = s.Watch("/foo", true, 0, 4, 1)
 	s.Create("/foo/foo/boo", "bar", Permanent, 5, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/boo" {
+	if e.Key != "/foo/foo/boo" || e.Action != Create {
 		t.Fatal("watch for Create subdirectory fails")
 	}
 
-	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	c, _ = s.Watch("/foo", true, 0, 5, 1)
 	s.Update("/foo/foo/boo", "foo", Permanent, 6, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/boo" {
+	if e.Key != "/foo/foo/boo" || e.Action != Update {
 		t.Fatal("watch for Update subdirectory fails")
 	}
 
-	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	c, _ = s.Watch("/foo", true, 0, 6, 1)
 	s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", Permanent, 7, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/boo" {
+	if e.Key != "/foo/foo/boo" || e.Action != TestAndSet {
 		t.Fatal("watch for TestAndSet subdirectory fails")
 	}
 
-	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	c, _ = s.Watch("/foo", true, 0, 7, 1)
 	s.Delete("/foo/foo/boo", false, 8, 1)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/boo" {
+	if e.Key != "/foo/foo/boo" || e.Action != Delete {
 		t.Fatal("watch for Delete subdirectory fails")
 	}
 
 	// watch expire
 	s.Create("/foo/foo/boo", "foo", time.Now().Add(time.Second*1), 9, 1)
-	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	c, _ = s.Watch("/foo", true, 0, 9, 1)
 	time.Sleep(time.Second * 2)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/boo" || e.Index != 9 {
+	if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 9 {
 		t.Fatal("watch for Expiration of Create() subdirectory fails ", e)
 	}
 
 	s.Create("/foo/foo/boo", "foo", Permanent, 10, 1)
 	s.Update("/foo/foo/boo", "bar", time.Now().Add(time.Second*1), 11, 1)
-	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	c, _ = s.Watch("/foo", true, 0, 11, 1)
 	time.Sleep(time.Second * 2)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/boo" || e.Index != 11 {
+	if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 11 {
 		t.Fatal("watch for Expiration of Update() subdirectory fails ", e)
 	}
 
 	s.Create("/foo/foo/boo", "foo", Permanent, 12, 1)
 	s.TestAndSet("/foo/foo/boo", "foo", 0, "bar", time.Now().Add(time.Second*1), 13, 1)
-	c, _ = s.WatcherHub.watch("/foo", true, 0)
+	c, _ = s.Watch("/foo", true, 0, 13, 1)
 	time.Sleep(time.Second * 2)
 	e = nonblockingRetrive(c)
-	if e.Key != "/foo/foo/boo" || e.Index != 13 {
+	if e.Key != "/foo/foo/boo" || e.Action != Expire || e.Index != 13 {
 		t.Fatal("watch for Expiration of TestAndSet() subdirectory fails ", e)
 	}
 
@@ -479,21 +479,12 @@ func TestSaveAndRecover(t *testing.T) {
 			panic(err)
 		}
 	}
-
+	s.worldLock.RLock()
+	defer s.worldLock.RUnlock()
 	if s.WatcherHub.EventHistory.StartIndex != cloneFs.WatcherHub.EventHistory.StartIndex {
 		t.Fatal("Error recovered event history start index")
 	}
 
-	//t.Log("watcherhub.size: ", s.WatcherHub.EventHistory.Queue.Size)
-	//for i = 0; int(i) < s.WatcherHub.EventHistory.Queue.Size; i++ {
-	//	t.Log(s.WatcherHub.EventHistory.Queue.Events[i])
-	//}
-	//
-	//t.Log("ClonedWatcherhub.size: ", cloneFs.WatcherHub.EventHistory.Queue.Size)
-	//for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ {
-	//	t.Log(cloneFs.WatcherHub.EventHistory.Queue.Events[i])
-	//}
-
 	for i = 0; int(i) < cloneFs.WatcherHub.EventHistory.Queue.Size; i++ {
 		if s.WatcherHub.EventHistory.Queue.Events[i].Key !=
 			cloneFs.WatcherHub.EventHistory.Queue.Events[i].Key {

+ 1 - 1
store/watcher.go

@@ -47,7 +47,7 @@ func (wh *watcherHub) watch(prefix string, recursive bool, index uint64) (<-chan
 	w := &watcher{
 		eventChan:  eventChan,
 		recursive:  recursive,
-		sinceIndex: index,
+		sinceIndex: index - 1, // to catch Expire()
 	}
 
 	l, ok := wh.watchers[prefix]

+ 7 - 8
store/watcher_test.go

@@ -7,8 +7,7 @@ import (
 func TestWatcher(t *testing.T) {
 	s := New()
 	wh := s.WatcherHub
-	c, err := wh.watch("/foo", true, 0)
-
+	c, err := wh.watch("/foo", true, 1)
 	if err != nil {
 		t.Fatal("%v", err)
 	}
@@ -20,7 +19,7 @@ func TestWatcher(t *testing.T) {
 		// do nothing
 	}
 
-	e := newEvent(Create, "/foo/bar", 1, 0)
+	e := newEvent(Create, "/foo/bar", 1, 1)
 
 	wh.notify(e)
 
@@ -30,20 +29,20 @@ func TestWatcher(t *testing.T) {
 		t.Fatal("recv != send")
 	}
 
-	c, _ = wh.watch("/foo", false, 0)
+	c, _ = wh.watch("/foo", false, 2)
 
-	e = newEvent(Create, "/foo/bar", 1, 0)
+	e = newEvent(Create, "/foo/bar", 2, 1)
 
 	wh.notify(e)
 
 	select {
-	case <-c:
-		t.Fatal("should not receive from channel if not recursive")
+	case re = <-c:
+		t.Fatal("should not receive from channel if not recursive ", re)
 	default:
 		// do nothing
 	}
 
-	e = newEvent(Create, "/foo", 1, 0)
+	e = newEvent(Create, "/foo", 3, 1)
 
 	wh.notify(e)