|
|
@@ -146,14 +146,17 @@ func (h *handler) ttlKeepAlive(k string, value string, ttl int, stopChan chan bo
|
|
|
func (h *handler) watch(keypath string, index int, closeChan <-chan bool) error {
|
|
|
// Wrap close chan so we can pass it to Client.Watch().
|
|
|
stopWatchChan := make(chan bool)
|
|
|
+ stopWrapChan := make(chan bool)
|
|
|
go func() {
|
|
|
select {
|
|
|
case <-closeChan:
|
|
|
stopWatchChan <- true
|
|
|
- case <-stopWatchChan:
|
|
|
+ case <- stopWrapChan:
|
|
|
+ stopWatchChan <- true
|
|
|
+ case <- stopWatchChan:
|
|
|
}
|
|
|
}()
|
|
|
- defer close(stopWatchChan)
|
|
|
+ defer close(stopWrapChan)
|
|
|
|
|
|
for {
|
|
|
// Read all nodes for the lock.
|
|
|
@@ -161,7 +164,6 @@ func (h *handler) watch(keypath string, index int, closeChan <-chan bool) error
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("lock watch lookup error: %s", err.Error())
|
|
|
}
|
|
|
- waitIndex := resp.Node.ModifiedIndex
|
|
|
nodes := lockNodes{resp.Node.Nodes}
|
|
|
prevIndex := nodes.PrevIndex(index)
|
|
|
|
|
|
@@ -171,6 +173,13 @@ func (h *handler) watch(keypath string, index int, closeChan <-chan bool) error
|
|
|
}
|
|
|
|
|
|
// Watch previous index until it's gone.
|
|
|
+ waitIndex := resp.Node.ModifiedIndex
|
|
|
+
|
|
|
+ // Since event store has only 1000 histories we should use first node's CreatedIndex if available
|
|
|
+ if firstNode := nodes.First(); firstNode != nil {
|
|
|
+ waitIndex = firstNode.CreatedIndex
|
|
|
+ }
|
|
|
+
|
|
|
_, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, false, nil, stopWatchChan)
|
|
|
if err == etcd.ErrWatchStoppedByUser {
|
|
|
return fmt.Errorf("lock watch closed")
|