Browse Source

etcd: fix refresh feature

When using refresh, etcd store v2 watch is broken. Although with refresh
store should not trigger current watchers, it should still add events into
the watchhub to make a complete history. Current store fails to add the event
into the watchhub, which causes issues.
Xiang Li 9 years ago
parent
commit
53084ebead
5 changed files with 38 additions and 17 deletions
  1. 4 3
      Documentation/v2/api.md
  2. 5 0
      store/event.go
  3. 16 14
      store/event_history.go
  4. 9 0
      store/store.go
  5. 4 0
      store/watcher_hub.go

+ 4 - 3
Documentation/v2/api.md

@@ -233,10 +233,11 @@ curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl= -d prevExist=t
 
 
 ### Refreshing key TTL
 ### Refreshing key TTL
 
 
-Keys in etcd can be refreshed without notifying watchers
-this can be achieved by setting the refresh to true when updating a TTL
+Keys in etcd can be refreshed without notifying current watchers.
 
 
-You cannot update the value of a key when refreshing it
+This can be achieved by setting the refresh to true when updating a TTL.
+
+You cannot update the value of a key when refreshing it.
 
 
 ```sh
 ```sh
 curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl=5
 curl http://127.0.0.1:2379/v2/keys/foo -XPUT -d value=bar -d ttl=5

+ 5 - 0
store/event.go

@@ -30,6 +30,7 @@ type Event struct {
 	Node      *NodeExtern `json:"node,omitempty"`
 	Node      *NodeExtern `json:"node,omitempty"`
 	PrevNode  *NodeExtern `json:"prevNode,omitempty"`
 	PrevNode  *NodeExtern `json:"prevNode,omitempty"`
 	EtcdIndex uint64      `json:"-"`
 	EtcdIndex uint64      `json:"-"`
+	Refresh   bool        `json:"refresh,omitempty"`
 }
 }
 
 
 func newEvent(action string, key string, modifiedIndex, createdIndex uint64) *Event {
 func newEvent(action string, key string, modifiedIndex, createdIndex uint64) *Event {
@@ -64,3 +65,7 @@ func (e *Event) Clone() *Event {
 		PrevNode:  e.PrevNode.Clone(),
 		PrevNode:  e.PrevNode.Clone(),
 	}
 	}
 }
 }
+
+func (e *Event) SetRefresh() {
+	e.Refresh = true
+}

+ 16 - 14
store/event_history.go

@@ -78,24 +78,26 @@ func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event,
 	for {
 	for {
 		e := eh.Queue.Events[i]
 		e := eh.Queue.Events[i]
 
 
-		ok := (e.Node.Key == key)
+		if !e.Refresh {
+			ok := (e.Node.Key == key)
 
 
-		if recursive {
-			// add tailing slash
-			key = path.Clean(key)
-			if key[len(key)-1] != '/' {
-				key = key + "/"
-			}
+			if recursive {
+				// add tailing slash
+				key = path.Clean(key)
+				if key[len(key)-1] != '/' {
+					key = key + "/"
+				}
 
 
-			ok = ok || strings.HasPrefix(e.Node.Key, key)
-		}
+				ok = ok || strings.HasPrefix(e.Node.Key, key)
+			}
 
 
-		if (e.Action == Delete || e.Action == Expire) && e.PrevNode != nil && e.PrevNode.Dir {
-			ok = ok || strings.HasPrefix(key, e.PrevNode.Key)
-		}
+			if (e.Action == Delete || e.Action == Expire) && e.PrevNode != nil && e.PrevNode.Dir {
+				ok = ok || strings.HasPrefix(key, e.PrevNode.Key)
+			}
 
 
-		if ok {
-			return e, nil
+			if ok {
+				return e, nil
+			}
 		}
 		}
 
 
 		i = (i + 1) % eh.Queue.Capacity
 		i = (i + 1) % eh.Queue.Capacity

+ 9 - 0
store/store.go

@@ -236,6 +236,9 @@ func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptio
 
 
 	if !expireOpts.Refresh {
 	if !expireOpts.Refresh {
 		s.WatcherHub.notify(e)
 		s.WatcherHub.notify(e)
+	} else {
+		e.SetRefresh()
+		s.WatcherHub.add(e)
 	}
 	}
 
 
 	return e, nil
 	return e, nil
@@ -314,6 +317,9 @@ func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint
 
 
 	if !expireOpts.Refresh {
 	if !expireOpts.Refresh {
 		s.WatcherHub.notify(e)
 		s.WatcherHub.notify(e)
+	} else {
+		e.SetRefresh()
+		s.WatcherHub.add(e)
 	}
 	}
 
 
 	return e, nil
 	return e, nil
@@ -539,6 +545,9 @@ func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet
 
 
 	if !expireOpts.Refresh {
 	if !expireOpts.Refresh {
 		s.WatcherHub.notify(e)
 		s.WatcherHub.notify(e)
+	} else {
+		e.SetRefresh()
+		s.WatcherHub.add(e)
 	}
 	}
 
 
 	s.CurrentIndex = nextIndex
 	s.CurrentIndex = nextIndex

+ 4 - 0
store/watcher_hub.go

@@ -115,6 +115,10 @@ func (wh *watcherHub) watch(key string, recursive, stream bool, index, storeInde
 	return w, nil
 	return w, nil
 }
 }
 
 
+func (wh *watcherHub) add(e *Event) {
+	e = wh.EventHistory.addEvent(e)
+}
+
 // notify function accepts an event and notify to the watchers.
 // notify function accepts an event and notify to the watchers.
 func (wh *watcherHub) notify(e *Event) {
 func (wh *watcherHub) notify(e *Event) {
 	e = wh.EventHistory.addEvent(e) // add event into the eventHistory
 	e = wh.EventHistory.addEvent(e) // add event into the eventHistory