Browse Source

fix(third_party): etcd from go-etcd was accidently gitignored

gitignore messup ignored thie directory. Add it!
Brandon Philips 12 years ago
parent
commit
3f07b37ada

+ 241 - 0
third_party/github.com/coreos/go-etcd/etcd/client.go

@@ -0,0 +1,241 @@
+package etcd
+
+import (
+	"crypto/tls"
+	"errors"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"path"
+	"strings"
+	"time"
+)
+
+const (
+	HTTP = iota
+	HTTPS
+)
+
+type Cluster struct {
+	Leader   string
+	Machines []string
+}
+
+type Config struct {
+	CertFile string
+	KeyFile  string
+	Scheme   string
+	Timeout  time.Duration
+}
+
+type Client struct {
+	cluster    Cluster
+	config     Config
+	httpClient *http.Client
+}
+
+// Setup a basic conf and cluster
+func NewClient() *Client {
+
+	// default leader and machines
+	cluster := Cluster{
+		Leader:   "0.0.0.0:4001",
+		Machines: make([]string, 1),
+	}
+	cluster.Machines[0] = "0.0.0.0:4001"
+
+	config := Config{
+		// default use http
+		Scheme: "http",
+		// default timeout is one second
+		Timeout: time.Second,
+	}
+
+	tr := &http.Transport{
+		Dial: dialTimeout,
+		TLSClientConfig: &tls.Config{
+			InsecureSkipVerify: true,
+		},
+	}
+
+	return &Client{
+		cluster:    cluster,
+		config:     config,
+		httpClient: &http.Client{Transport: tr},
+	}
+
+}
+
+func (c *Client) SetCertAndKey(cert string, key string) (bool, error) {
+
+	if cert != "" && key != "" {
+		tlsCert, err := tls.LoadX509KeyPair(cert, key)
+
+		if err != nil {
+			return false, err
+		}
+
+		tr := &http.Transport{
+			TLSClientConfig: &tls.Config{
+				Certificates:       []tls.Certificate{tlsCert},
+				InsecureSkipVerify: true,
+			},
+			Dial: dialTimeout,
+		}
+
+		c.httpClient = &http.Client{Transport: tr}
+		return true, nil
+	}
+	return false, errors.New("Require both cert and key path")
+}
+
+func (c *Client) SetScheme(scheme int) (bool, error) {
+	if scheme == HTTP {
+		c.config.Scheme = "http"
+		return true, nil
+	}
+	if scheme == HTTPS {
+		c.config.Scheme = "https"
+		return true, nil
+	}
+	return false, errors.New("Unknown Scheme")
+}
+
+// Try to sync from the given machine
+func (c *Client) SetCluster(machines []string) bool {
+	success := c.internalSyncCluster(machines)
+	return success
+}
+
+// sycn cluster information using the existing machine list
+func (c *Client) SyncCluster() bool {
+	success := c.internalSyncCluster(c.cluster.Machines)
+	return success
+}
+
+// sync cluster information by providing machine list
+func (c *Client) internalSyncCluster(machines []string) bool {
+	for _, machine := range machines {
+		httpPath := c.createHttpPath(machine, "machines")
+		resp, err := c.httpClient.Get(httpPath)
+		if err != nil {
+			// try another machine in the cluster
+			continue
+		} else {
+			b, err := ioutil.ReadAll(resp.Body)
+			if err != nil {
+				// try another machine in the cluster
+				continue
+			}
+			// update Machines List
+			c.cluster.Machines = strings.Split(string(b), ",")
+			logger.Debug("sync.machines ", c.cluster.Machines)
+			return true
+		}
+	}
+	return false
+}
+
+// serverName should contain both hostName and port
+func (c *Client) createHttpPath(serverName string, _path string) string {
+	httpPath := path.Join(serverName, _path)
+	httpPath = c.config.Scheme + "://" + httpPath
+	return httpPath
+}
+
+// Dial with timeout.
+func dialTimeout(network, addr string) (net.Conn, error) {
+	return net.DialTimeout(network, addr, time.Second)
+}
+
+func (c *Client) getHttpPath(s ...string) string {
+	httpPath := path.Join(c.cluster.Leader, version)
+
+	for _, seg := range s {
+		httpPath = path.Join(httpPath, seg)
+	}
+
+	httpPath = c.config.Scheme + "://" + httpPath
+	return httpPath
+}
+
+func (c *Client) updateLeader(httpPath string) {
+	// httpPath http://127.0.0.1:4001/v1...
+	leader := strings.Split(httpPath, "://")[1]
+	// we want to have 127.0.0.1:4001
+
+	leader = strings.Split(leader, "/")[0]
+	logger.Debugf("update.leader[%s,%s]", c.cluster.Leader, leader)
+	c.cluster.Leader = leader
+}
+
+// Wrap GET, POST and internal error handling
+func (c *Client) sendRequest(method string, _path string, body string) (*http.Response, error) {
+
+	var resp *http.Response
+	var err error
+	var req *http.Request
+
+	retry := 0
+	// if we connect to a follower, we will retry until we found a leader
+	for {
+
+		httpPath := c.getHttpPath(_path)
+		logger.Debug("send.request.to ", httpPath)
+		if body == "" {
+
+			req, _ = http.NewRequest(method, httpPath, nil)
+
+		} else {
+			req, _ = http.NewRequest(method, httpPath, strings.NewReader(body))
+			req.Header.Set("Content-Type", "application/x-www-form-urlencoded; param=value")
+		}
+
+		resp, err = c.httpClient.Do(req)
+
+		logger.Debug("recv.response.from ", httpPath)
+		// network error, change a machine!
+		if err != nil {
+			retry++
+			if retry > 2*len(c.cluster.Machines) {
+				return nil, errors.New("Cannot reach servers")
+			}
+			num := retry % len(c.cluster.Machines)
+			logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]")
+			c.cluster.Leader = c.cluster.Machines[num]
+			time.Sleep(time.Millisecond * 200)
+			continue
+		}
+
+		if resp != nil {
+			if resp.StatusCode == http.StatusTemporaryRedirect {
+				httpPath := resp.Header.Get("Location")
+
+				resp.Body.Close()
+
+				if httpPath == "" {
+					return nil, errors.New("Cannot get redirection location")
+				}
+
+				c.updateLeader(httpPath)
+				logger.Debug("send.redirect")
+				// try to connect the leader
+				continue
+			} else if resp.StatusCode == http.StatusInternalServerError {
+				retry++
+				if retry > 2*len(c.cluster.Machines) {
+					return nil, errors.New("Cannot reach servers")
+				}
+				resp.Body.Close()
+				continue
+			} else {
+				logger.Debug("send.return.response ", httpPath)
+				break
+			}
+
+		}
+		logger.Debug("error.from ", httpPath, " ", err.Error())
+		return nil, err
+	}
+	return resp, nil
+}

+ 38 - 0
third_party/github.com/coreos/go-etcd/etcd/client_test.go

@@ -0,0 +1,38 @@
+package etcd
+
+import (
+	"fmt"
+	"testing"
+)
+
+// To pass this test, we need to create a cluster of 3 machines
+// The server should be listening on 127.0.0.1:4001, 4002, 4003
+func TestSync(t *testing.T) {
+	fmt.Println("Make sure there are three nodes at 0.0.0.0:4001-4003")
+
+	c := NewClient()
+
+	success := c.SyncCluster()
+	if !success {
+		t.Fatal("cannot sync machines")
+	}
+
+	badMachines := []string{"abc", "edef"}
+
+	success = c.SetCluster(badMachines)
+
+	if success {
+		t.Fatal("should not sync on bad machines")
+	}
+
+	goodMachines := []string{"127.0.0.1:4002"}
+
+	success = c.SetCluster(goodMachines)
+
+	if !success {
+		t.Fatal("cannot sync machines")
+	} else {
+		fmt.Println(c.cluster.Machines)
+	}
+
+}

+ 19 - 0
third_party/github.com/coreos/go-etcd/etcd/debug.go

@@ -0,0 +1,19 @@
+package etcd
+
+import (
+	"github.com/ccding/go-logging/logging"
+)
+
+var logger, _ = logging.SimpleLogger("go-etcd")
+
+func init() {
+	logger.SetLevel(logging.FATAL)
+}
+
+func OpenDebug() {
+	logger.SetLevel(logging.NOTSET)
+}
+
+func CloseDebug() {
+	logger.SetLevel(logging.FATAL)
+}

+ 41 - 0
third_party/github.com/coreos/go-etcd/etcd/delete.go

@@ -0,0 +1,41 @@
+package etcd
+
+import (
+	"encoding/json"
+	"github.com/coreos/etcd/store"
+	"io/ioutil"
+	"net/http"
+	"path"
+)
+
+func (c *Client) Delete(key string) (*store.Response, error) {
+
+	resp, err := c.sendRequest("DELETE", path.Join("keys", key), "")
+
+	if err != nil {
+		return nil, err
+	}
+
+	b, err := ioutil.ReadAll(resp.Body)
+
+	resp.Body.Close()
+
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, handleError(b)
+	}
+
+	var result store.Response
+
+	err = json.Unmarshal(b, &result)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return &result, nil
+
+}

+ 22 - 0
third_party/github.com/coreos/go-etcd/etcd/delete_test.go

@@ -0,0 +1,22 @@
+package etcd
+
+import (
+	"testing"
+)
+
+func TestDelete(t *testing.T) {
+
+	c := NewClient()
+
+	c.Set("foo", "bar", 100)
+	result, err := c.Delete("foo")
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if result.PrevValue != "bar" || result.Value != "" {
+		t.Fatalf("Delete failed with %s %s", result.PrevValue,
+			result.Value)
+	}
+
+}

+ 24 - 0
third_party/github.com/coreos/go-etcd/etcd/error.go

@@ -0,0 +1,24 @@
+package etcd
+
+import (
+	"encoding/json"
+	"fmt"
+)
+
+type EtcdError struct {
+	ErrorCode int    `json:"errorCode"`
+	Message   string `json:"message"`
+	Cause     string `json:"cause,omitempty"`
+}
+
+func (e EtcdError) Error() string {
+	return fmt.Sprintf("%d: %s (%s)", e.ErrorCode, e.Message, e.Cause)
+}
+
+func handleError(b []byte) error {
+	var err EtcdError
+
+	json.Unmarshal(b, &err)
+
+	return err
+}

+ 83 - 0
third_party/github.com/coreos/go-etcd/etcd/get.go

@@ -0,0 +1,83 @@
+package etcd
+
+import (
+	"encoding/json"
+	"github.com/coreos/etcd/store"
+	"io/ioutil"
+	"net/http"
+	"path"
+)
+
+func (c *Client) Get(key string) ([]*store.Response, error) {
+	logger.Debugf("get %s [%s]", key, c.cluster.Leader)
+	resp, err := c.sendRequest("GET", path.Join("keys", key), "")
+
+	if err != nil {
+		return nil, err
+	}
+
+	b, err := ioutil.ReadAll(resp.Body)
+
+	resp.Body.Close()
+
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+
+		return nil, handleError(b)
+	}
+
+	return convertGetResponse(b)
+
+}
+
+// GetTo gets the value of the key from a given machine address.
+// If the given machine is not available it returns an error.
+// Mainly use for testing purpose
+func (c *Client) GetFrom(key string, addr string) ([]*store.Response, error) {
+	httpPath := c.createHttpPath(addr, path.Join(version, "keys", key))
+
+	resp, err := c.httpClient.Get(httpPath)
+
+	if err != nil {
+		return nil, err
+	}
+
+	b, err := ioutil.ReadAll(resp.Body)
+
+	resp.Body.Close()
+
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, handleError(b)
+	}
+
+	return convertGetResponse(b)
+}
+
+// Convert byte stream to response.
+func convertGetResponse(b []byte) ([]*store.Response, error) {
+
+	var results []*store.Response
+	var result *store.Response
+
+	err := json.Unmarshal(b, &result)
+
+	if err != nil {
+		err = json.Unmarshal(b, &results)
+
+		if err != nil {
+			return nil, err
+		}
+
+	} else {
+		results = make([]*store.Response, 1)
+		results[0] = result
+	}
+	return results, nil
+}

+ 46 - 0
third_party/github.com/coreos/go-etcd/etcd/get_test.go

@@ -0,0 +1,46 @@
+package etcd
+
+import (
+	"testing"
+	"time"
+)
+
+func TestGet(t *testing.T) {
+
+	c := NewClient()
+
+	c.Set("foo", "bar", 100)
+
+	// wait for commit
+	time.Sleep(100 * time.Millisecond)
+
+	results, err := c.Get("foo")
+
+	if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" {
+		if err != nil {
+			t.Fatal(err)
+		}
+		t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL)
+	}
+
+	results, err = c.Get("goo")
+
+	if err == nil {
+		t.Fatalf("should not be able to get non-exist key")
+	}
+
+	results, err = c.GetFrom("foo", "0.0.0.0:4001")
+
+	if err != nil || results[0].Key != "/foo" || results[0].Value != "bar" {
+		if err != nil {
+			t.Fatal(err)
+		}
+		t.Fatalf("Get failed with %s %s %v", results[0].Key, results[0].Value, results[0].TTL)
+	}
+
+	results, err = c.GetFrom("foo", "0.0.0.0:4009")
+
+	if err == nil {
+		t.Fatal("should not get from port 4009")
+	}
+}

+ 23 - 0
third_party/github.com/coreos/go-etcd/etcd/list_test.go

@@ -0,0 +1,23 @@
+package etcd
+
+import (
+	"testing"
+	"time"
+)
+
+func TestList(t *testing.T) {
+	c := NewClient()
+
+	c.Set("foo_list/foo", "bar", 100)
+	c.Set("foo_list/fooo", "barbar", 100)
+	c.Set("foo_list/foooo/foo", "barbarbar", 100)
+	// wait for commit
+	time.Sleep(time.Second)
+
+	_, err := c.Get("foo_list")
+
+	if err != nil {
+		t.Fatal(err)
+	}
+
+}

+ 90 - 0
third_party/github.com/coreos/go-etcd/etcd/set.go

@@ -0,0 +1,90 @@
+package etcd
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/coreos/etcd/store"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"path"
+)
+
+func (c *Client) Set(key string, value string, ttl uint64) (*store.Response, error) {
+	logger.Debugf("set %s, %s, ttl: %d, [%s]", key, value, ttl, c.cluster.Leader)
+	v := url.Values{}
+	v.Set("value", value)
+
+	if ttl > 0 {
+		v.Set("ttl", fmt.Sprintf("%v", ttl))
+	}
+
+	resp, err := c.sendRequest("POST", path.Join("keys", key), v.Encode())
+
+	if err != nil {
+		return nil, err
+	}
+
+	b, err := ioutil.ReadAll(resp.Body)
+
+	resp.Body.Close()
+
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+
+		return nil, handleError(b)
+	}
+
+	return convertSetResponse(b)
+
+}
+
+// SetTo sets the value of the key to a given machine address.
+// If the given machine is not available or is not leader it returns an error
+// Mainly use for testing purpose.
+func (c *Client) SetTo(key string, value string, ttl uint64, addr string) (*store.Response, error) {
+	v := url.Values{}
+	v.Set("value", value)
+
+	if ttl > 0 {
+		v.Set("ttl", fmt.Sprintf("%v", ttl))
+	}
+
+	httpPath := c.createHttpPath(addr, path.Join(version, "keys", key))
+
+	resp, err := c.httpClient.PostForm(httpPath, v)
+
+	if err != nil {
+		return nil, err
+	}
+
+	b, err := ioutil.ReadAll(resp.Body)
+
+	resp.Body.Close()
+
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, handleError(b)
+	}
+
+	return convertSetResponse(b)
+}
+
+// Convert byte stream to response.
+func convertSetResponse(b []byte) (*store.Response, error) {
+	var result store.Response
+
+	err := json.Unmarshal(b, &result)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return &result, nil
+}

+ 42 - 0
third_party/github.com/coreos/go-etcd/etcd/set_test.go

@@ -0,0 +1,42 @@
+package etcd
+
+import (
+	"testing"
+	"time"
+)
+
+func TestSet(t *testing.T) {
+	c := NewClient()
+
+	result, err := c.Set("foo", "bar", 100)
+
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL != 99 {
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+
+	time.Sleep(time.Second)
+
+	result, err = c.Set("foo", "bar", 100)
+
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.PrevValue != "bar" || result.TTL != 99 {
+		if err != nil {
+			t.Fatal(err)
+		}
+		t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+
+	result, err = c.SetTo("toFoo", "bar", 100, "0.0.0.0:4001")
+
+	if err != nil || result.Key != "/toFoo" || result.Value != "bar" || result.TTL != 99 {
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		t.Fatalf("SetTo failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+
+}

+ 57 - 0
third_party/github.com/coreos/go-etcd/etcd/testAndSet.go

@@ -0,0 +1,57 @@
+package etcd
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/coreos/etcd/store"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"path"
+)
+
+func (c *Client) TestAndSet(key string, prevValue string, value string, ttl uint64) (*store.Response, bool, error) {
+	logger.Debugf("set %s, %s[%s], ttl: %d, [%s]", key, value, prevValue, ttl, c.cluster.Leader)
+	v := url.Values{}
+	v.Set("value", value)
+	v.Set("prevValue", prevValue)
+
+	if ttl > 0 {
+		v.Set("ttl", fmt.Sprintf("%v", ttl))
+	}
+
+	resp, err := c.sendRequest("POST", path.Join("keys", key), v.Encode())
+
+	if err != nil {
+		return nil, false, err
+	}
+
+	b, err := ioutil.ReadAll(resp.Body)
+
+	resp.Body.Close()
+
+	if err != nil {
+
+		return nil, false, err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		return nil, false, handleError(b)
+	}
+
+	var result store.Response
+
+	err = json.Unmarshal(b, &result)
+
+	if err != nil {
+		return nil, false, err
+	}
+
+	if result.PrevValue == prevValue && result.Value == value {
+
+		return &result, true, nil
+	}
+
+	return &result, false, nil
+
+}

+ 39 - 0
third_party/github.com/coreos/go-etcd/etcd/testAndSet_test.go

@@ -0,0 +1,39 @@
+package etcd
+
+import (
+	"testing"
+	"time"
+)
+
+func TestTestAndSet(t *testing.T) {
+	c := NewClient()
+
+	c.Set("foo_testAndSet", "bar", 100)
+
+	time.Sleep(time.Second)
+
+	results := make(chan bool, 3)
+
+	for i := 0; i < 3; i++ {
+		testAndSet("foo_testAndSet", "bar", "barbar", results, c)
+	}
+
+	count := 0
+
+	for i := 0; i < 3; i++ {
+		result := <-results
+		if result {
+			count++
+		}
+	}
+
+	if count != 1 {
+		t.Fatalf("test and set fails %v", count)
+	}
+
+}
+
+func testAndSet(key string, prevValue string, value string, ch chan bool, c *Client) {
+	_, success, _ := c.TestAndSet(key, prevValue, value, 0)
+	ch <- success
+}

+ 3 - 0
third_party/github.com/coreos/go-etcd/etcd/version.go

@@ -0,0 +1,3 @@
+package etcd
+
+var version = "v1"

+ 117 - 0
third_party/github.com/coreos/go-etcd/etcd/watch.go

@@ -0,0 +1,117 @@
+package etcd
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"github.com/coreos/etcd/store"
+	"io/ioutil"
+	"net/http"
+	"net/url"
+	"path"
+)
+
+type respAndErr struct {
+	resp *http.Response
+	err  error
+}
+
+// Watch any change under the given prefix.
+// When a sinceIndex is given, watch will try to scan from that index to the last index
+// and will return any changes under the given prefix during the history
+// If a receiver channel is given, it will be a long-term watch. Watch will block at the
+// channel. And after someone receive the channel, it will go on to watch that prefix.
+// If a stop channel is given, client can close long-term watch using the stop channel
+
+func (c *Client) Watch(prefix string, sinceIndex uint64, receiver chan *store.Response, stop chan bool) (*store.Response, error) {
+	logger.Debugf("watch %s [%s]", prefix, c.cluster.Leader)
+	if receiver == nil {
+		return c.watchOnce(prefix, sinceIndex, stop)
+
+	} else {
+		for {
+			resp, err := c.watchOnce(prefix, sinceIndex, stop)
+			if resp != nil {
+				sinceIndex = resp.Index + 1
+				receiver <- resp
+			} else {
+				return nil, err
+			}
+		}
+	}
+
+	return nil, nil
+}
+
+// helper func
+// return when there is change under the given prefix
+func (c *Client) watchOnce(key string, sinceIndex uint64, stop chan bool) (*store.Response, error) {
+
+	var resp *http.Response
+	var err error
+
+	if sinceIndex == 0 {
+		// Get request if no index is given
+		resp, err = c.sendRequest("GET", path.Join("watch", key), "")
+
+		if err != nil {
+			return nil, err
+		}
+
+	} else {
+
+		// Post
+		v := url.Values{}
+		v.Set("index", fmt.Sprintf("%v", sinceIndex))
+
+		ch := make(chan respAndErr)
+
+		if stop != nil {
+			go func() {
+				resp, err = c.sendRequest("POST", path.Join("watch", key), v.Encode())
+
+				ch <- respAndErr{resp, err}
+			}()
+
+			// select at stop or continue to receive
+			select {
+
+			case res := <-ch:
+				resp, err = res.resp, res.err
+
+			case <-stop:
+				resp, err = nil, errors.New("User stoped watch")
+			}
+		} else {
+			resp, err = c.sendRequest("POST", path.Join("watch", key), v.Encode())
+		}
+
+		if err != nil {
+			return nil, err
+		}
+
+	}
+
+	b, err := ioutil.ReadAll(resp.Body)
+
+	resp.Body.Close()
+
+	if err != nil {
+		return nil, err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+
+		return nil, handleError(b)
+	}
+
+	var result store.Response
+
+	err = json.Unmarshal(b, &result)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return &result, nil
+}

+ 62 - 0
third_party/github.com/coreos/go-etcd/etcd/watch_test.go

@@ -0,0 +1,62 @@
+package etcd
+
+import (
+	"fmt"
+	"github.com/coreos/etcd/store"
+	"testing"
+	"time"
+)
+
+func TestWatch(t *testing.T) {
+	c := NewClient()
+
+	go setHelper("bar", c)
+
+	result, err := c.Watch("watch_foo", 0, nil, nil)
+
+	if err != nil || result.Key != "/watch_foo/foo" || result.Value != "bar" {
+		if err != nil {
+			t.Fatal(err)
+		}
+		t.Fatalf("Watch failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index)
+	}
+
+	result, err = c.Watch("watch_foo", result.Index, nil, nil)
+
+	if err != nil || result.Key != "/watch_foo/foo" || result.Value != "bar" {
+		if err != nil {
+			t.Fatal(err)
+		}
+		t.Fatalf("Watch with Index failed with %s %s %v %v", result.Key, result.Value, result.TTL, result.Index)
+	}
+
+	ch := make(chan *store.Response, 10)
+	stop := make(chan bool, 1)
+
+	go setLoop("bar", c)
+
+	go reciver(ch, stop)
+
+	c.Watch("watch_foo", 0, ch, stop)
+}
+
+func setHelper(value string, c *Client) {
+	time.Sleep(time.Second)
+	c.Set("watch_foo/foo", value, 100)
+}
+
+func setLoop(value string, c *Client) {
+	time.Sleep(time.Second)
+	for i := 0; i < 10; i++ {
+		newValue := fmt.Sprintf("%s_%v", value, i)
+		c.Set("watch_foo/foo", newValue, 100)
+		time.Sleep(time.Second / 10)
+	}
+}
+
+func reciver(c chan *store.Response, stop chan bool) {
+	for i := 0; i < 10; i++ {
+		<-c
+	}
+	stop <- true
+}