Browse Source

Merge pull request #347 from benbjohnson/mod-lock

WIP: mod/lock
Brandon Philips 12 years ago
parent
commit
af20be8123

+ 128 - 0
mod/lock/v2/acquire_handler.go

@@ -0,0 +1,128 @@
+package v2
+
+import (
+	"net/http"
+	"path"
+	"strconv"
+	"time"
+
+	"github.com/coreos/go-etcd/etcd"
+	"github.com/gorilla/mux"
+)
+
+// acquireHandler attempts to acquire a lock on the given key.
+// The "key" parameter specifies the resource to lock.
+// The "ttl" parameter specifies how long the lock will persist for.
+// The "timeout" parameter specifies how long the request should wait for the lock.
+func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
+	h.client.SyncCluster()
+
+	// Setup connection watcher.
+	closeNotifier, _ := w.(http.CloseNotifier)
+	closeChan := closeNotifier.CloseNotify()
+
+	// Parse "key" and "ttl" query parameters.
+	vars := mux.Vars(req)
+	keypath := path.Join(prefix, vars["key"])
+	ttl, err := strconv.Atoi(req.FormValue("ttl"))
+	if err != nil {
+		http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
+		return
+	}
+	
+	// Parse "timeout" parameter.
+	var timeout int
+	if len(req.FormValue("timeout")) == 0 {
+		timeout = -1
+	} else if timeout, err = strconv.Atoi(req.FormValue("timeout")); err != nil {
+		http.Error(w, "invalid timeout: " + err.Error(), http.StatusInternalServerError)
+		return
+	}
+	timeout = timeout + 1
+
+	// Create an incrementing id for the lock.
+	resp, err := h.client.AddChild(keypath, "-", uint64(ttl))
+	if err != nil {
+		http.Error(w, "add lock index error: " + err.Error(), http.StatusInternalServerError)
+		return
+	}
+	indexpath := resp.Node.Key
+
+	// Keep updating TTL to make sure lock request is not expired before acquisition.
+	stop := make(chan bool)
+	go h.ttlKeepAlive(indexpath, ttl, stop)
+
+	// Monitor for broken connection.
+	stopWatchChan := make(chan bool)
+	go func() {
+		select {
+		case <-closeChan:
+			stopWatchChan <- true
+		case <-stop:
+			// Stop watching for connection disconnect.
+		}
+	}()
+
+	// Extract the lock index.
+	index, _ := strconv.Atoi(path.Base(resp.Node.Key))
+
+	// Wait until we successfully get a lock or we get a failure.
+	var success bool
+	for {
+		// Read all indices.
+		resp, err = h.client.Get(keypath, true, true)
+		if err != nil {
+			http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
+			break
+		}
+		indices := extractResponseIndices(resp)
+		waitIndex := resp.Node.ModifiedIndex
+		prevIndex := findPrevIndex(indices, index)
+
+		// If there is no previous index then we have the lock.
+		if prevIndex == 0 {
+			success = true
+			break
+		}
+
+		// Otherwise watch previous index until it's gone.
+		_, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, false, nil, stopWatchChan)
+		if err == etcd.ErrWatchStoppedByUser {
+			break
+		} else if err != nil {
+			http.Error(w, "lock watch error: " + err.Error(), http.StatusInternalServerError)
+			break
+		}
+	}
+
+	// Check for connection disconnect before we write the lock index.
+	select {
+	case <-stopWatchChan:
+		success = false
+	default:
+	}
+
+	// Stop the ttl keep-alive.
+	close(stop)
+
+	if success {
+		// Write lock index to response body if we acquire the lock.
+		h.client.Update(indexpath, "-", uint64(ttl))
+		w.Write([]byte(strconv.Itoa(index)))
+	} else {
+		// Make sure key is deleted if we couldn't acquire.
+		h.client.Delete(indexpath, false)
+	}
+}
+
+// ttlKeepAlive continues to update a key's TTL until the stop channel is closed.
+func (h *handler) ttlKeepAlive(k string, ttl int, stop chan bool) {
+	for {
+		select {
+		case <-time.After(time.Duration(ttl / 2) * time.Second):
+			h.client.Update(k, "-", uint64(ttl))
+		case <-stop:
+			return
+		}
+	}
+}

+ 30 - 0
mod/lock/v2/get_index_handler.go

@@ -0,0 +1,30 @@
+package v2
+
+import (
+	"net/http"
+	"path"
+	"strconv"
+
+	"github.com/gorilla/mux"
+)
+
+// getIndexHandler retrieves the current lock index.
+func (h *handler) getIndexHandler(w http.ResponseWriter, req *http.Request) {
+	h.client.SyncCluster()
+
+	vars := mux.Vars(req)
+	keypath := path.Join(prefix, vars["key"])
+
+	// Read all indices.
+	resp, err := h.client.Get(keypath, true, true)
+	if err != nil {
+		http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
+		return
+	}
+
+	// Write out the index of the last one to the response body.
+	indices := extractResponseIndices(resp)
+	if len(indices) > 0 {
+		w.Write([]byte(strconv.Itoa(indices[0])))
+	}
+}

+ 58 - 0
mod/lock/v2/handler.go

@@ -0,0 +1,58 @@
+package v2
+
+import (
+	"net/http"
+	"path"
+	"strconv"
+	"sort"
+
+	"github.com/gorilla/mux"
+	"github.com/coreos/go-etcd/etcd"
+)
+
+const prefix = "/_etcd/mod/lock"
+
+// handler manages the lock HTTP request.
+type handler struct {
+	*mux.Router
+	client *etcd.Client
+}
+
+// NewHandler creates an HTTP handler that can be registered on a router.
+func NewHandler(addr string) (http.Handler) {
+	h := &handler{
+		Router: mux.NewRouter(),
+		client: etcd.NewClient([]string{addr}),
+	}
+	h.StrictSlash(false)
+	h.HandleFunc("/{key:.*}", h.getIndexHandler).Methods("GET")
+	h.HandleFunc("/{key:.*}", h.acquireHandler).Methods("POST")
+	h.HandleFunc("/{key_with_index:.*}", h.renewLockHandler).Methods("PUT")
+	h.HandleFunc("/{key_with_index:.*}", h.releaseLockHandler).Methods("DELETE")
+	return h
+}
+
+
+// extractResponseIndices extracts a sorted list of indicies from a response.
+func extractResponseIndices(resp *etcd.Response) []int {
+	var indices []int
+	for _, node := range resp.Node.Nodes {
+		if index, _ := strconv.Atoi(path.Base(node.Key)); index > 0 {
+			indices = append(indices, index)
+		}
+	}
+	sort.Ints(indices)
+	return indices
+}
+
+// findPrevIndex retrieves the previous index before the given index.
+func findPrevIndex(indices []int, idx int) int {
+	var prevIndex int
+	for _, index := range indices {
+		if index == idx {
+			break
+		}
+		prevIndex = index
+	}
+	return prevIndex
+}

+ 24 - 0
mod/lock/v2/release_handler.go

@@ -0,0 +1,24 @@
+package v2
+
+import (
+	"path"
+	"net/http"
+
+	"github.com/gorilla/mux"
+)
+
+// releaseLockHandler deletes the lock.
+func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) {
+	h.client.SyncCluster()
+
+	vars := mux.Vars(req)
+	keypath := path.Join(prefix, vars["key_with_index"])
+
+	// Delete the lock.
+	_, err := h.client.Delete(keypath, false)
+	if err != nil {
+		http.Error(w, "delete lock index error: " + err.Error(), http.StatusInternalServerError)
+		return
+	}
+}
+

+ 30 - 0
mod/lock/v2/renew_handler.go

@@ -0,0 +1,30 @@
+package v2
+
+import (
+	"path"
+	"net/http"
+	"strconv"
+
+	"github.com/gorilla/mux"
+)
+
+// renewLockHandler attempts to update the TTL on an existing lock.
+// Returns a 200 OK if successful. Returns non-200 on error.
+func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) {
+	h.client.SyncCluster()
+
+	vars := mux.Vars(req)
+	keypath := path.Join(prefix, vars["key_with_index"])
+	ttl, err := strconv.Atoi(req.FormValue("ttl"))
+	if err != nil {
+		http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
+		return
+	}
+
+	// Renew the lock, if it exists.
+	_, err = h.client.Update(keypath, "-", uint64(ttl))
+	if err != nil {
+		http.Error(w, "renew lock index error: " + err.Error(), http.StatusInternalServerError)
+		return
+	}
+}

+ 188 - 0
mod/lock/v2/tests/handler_test.go

@@ -0,0 +1,188 @@
+package lock
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/server"
+	"github.com/coreos/etcd/tests"
+	"github.com/stretchr/testify/assert"
+)
+
+// Ensure that a lock can be acquired and released.
+func TestModLockAcquireAndRelease(t *testing.T) {
+	tests.RunServer(func(s *server.Server) {
+		// Acquire lock.
+		body, err := testAcquireLock(s, "foo", 10)
+		assert.NoError(t, err)
+		assert.Equal(t, body, "2")
+
+		// Check that we have the lock.
+		body, err = testGetLockIndex(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "2")
+
+		// Release lock.
+		body, err = testReleaseLock(s, "foo", 2)
+		assert.NoError(t, err)
+		assert.Equal(t, body, "")
+
+		// Check that we have the lock.
+		body, err = testGetLockIndex(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "")
+	})
+}
+
+// Ensure that a lock can be acquired and another process is blocked until released.
+func TestModLockBlockUntilAcquire(t *testing.T) {
+	tests.RunServer(func(s *server.Server) {
+		c := make(chan bool)
+
+		// Acquire lock #1.
+		go func() {
+			body, err := testAcquireLock(s, "foo", 10)
+			assert.NoError(t, err)
+			assert.Equal(t, body, "2")
+			c <- true
+		}()
+		<- c
+
+		// Acquire lock #2.
+		go func() {
+			c <- true
+			body, err := testAcquireLock(s, "foo", 10)
+			assert.NoError(t, err)
+			assert.Equal(t, body, "4")
+		}()
+		<- c
+
+		time.Sleep(1 * time.Second)
+
+		// Check that we have the lock #1.
+		body, err := testGetLockIndex(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "2")
+
+		// Release lock #1.
+		body, err = testReleaseLock(s, "foo", 2)
+		assert.NoError(t, err)
+
+		// Check that we have lock #2.
+		body, err = testGetLockIndex(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "4")
+
+		// Release lock #2.
+		body, err = testReleaseLock(s, "foo", 4)
+		assert.NoError(t, err)
+
+		// Check that we have no lock.
+		body, err = testGetLockIndex(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "")
+	})
+}
+
+// Ensure that a lock will be released after the TTL.
+func TestModLockExpireAndRelease(t *testing.T) {
+	tests.RunServer(func(s *server.Server) {
+		c := make(chan bool)
+
+		// Acquire lock #1.
+		go func() {
+			body, err := testAcquireLock(s, "foo", 2)
+			assert.NoError(t, err)
+			assert.Equal(t, body, "2")
+			c <- true
+		}()
+		<- c
+
+		// Acquire lock #2.
+		go func() {
+			c <- true
+			body, err := testAcquireLock(s, "foo", 10)
+			assert.NoError(t, err)
+			assert.Equal(t, body, "4")
+		}()
+		<- c
+
+		time.Sleep(1 * time.Second)
+
+		// Check that we have the lock #1.
+		body, err := testGetLockIndex(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "2")
+
+		// Wait for lock #1 TTL.
+		time.Sleep(2 * time.Second)
+
+		// Check that we have lock #2.
+		body, err = testGetLockIndex(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "4")
+	})
+}
+
+// Ensure that a lock can be renewed.
+func TestModLockRenew(t *testing.T) {
+	tests.RunServer(func(s *server.Server) {
+		// Acquire lock.
+		body, err := testAcquireLock(s, "foo", 3)
+		assert.NoError(t, err)
+		assert.Equal(t, body, "2")
+
+		time.Sleep(2 * time.Second)
+
+		// Check that we have the lock.
+		body, err = testGetLockIndex(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "2")
+
+		// Renew lock.
+		body, err = testRenewLock(s, "foo", 2, 3)
+		assert.NoError(t, err)
+		assert.Equal(t, body, "")
+
+		time.Sleep(2 * time.Second)
+
+		// Check that we still have the lock.
+		body, err = testGetLockIndex(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "2")
+
+		time.Sleep(2 * time.Second)
+
+		// Check that lock was released.
+		body, err = testGetLockIndex(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "")
+	})
+}
+
+
+
+func testAcquireLock(s *server.Server, key string, ttl int) (string, error) {
+	resp, err := tests.PostForm(fmt.Sprintf("%s/mod/v2/lock/%s?ttl=%d", s.URL(), key, ttl), nil)
+	ret := tests.ReadBody(resp)
+	return string(ret), err
+}
+
+func testGetLockIndex(s *server.Server, key string) (string, error) {
+	resp, err := tests.Get(fmt.Sprintf("%s/mod/v2/lock/%s", s.URL(), key))
+	ret := tests.ReadBody(resp)
+	return string(ret), err
+}
+
+func testReleaseLock(s *server.Server, key string, index int) (string, error) {
+	resp, err := tests.DeleteForm(fmt.Sprintf("%s/mod/v2/lock/%s/%d", s.URL(), key, index), nil)
+	ret := tests.ReadBody(resp)
+	return string(ret), err
+}
+
+func testRenewLock(s *server.Server, key string, index int, ttl int) (string, error) {
+	resp, err := tests.PutForm(fmt.Sprintf("%s/mod/v2/lock/%s/%d?ttl=%d", s.URL(), key, index, ttl), nil)
+	ret := tests.ReadBody(resp)
+	return string(ret), err
+}

+ 8 - 6
mod/mod.go

@@ -6,6 +6,7 @@ import (
 	"path"
 
 	"github.com/coreos/etcd/mod/dashboard"
+	lock2 "github.com/coreos/etcd/mod/lock/v2"
 	"github.com/gorilla/mux"
 )
 
@@ -16,11 +17,12 @@ func addSlash(w http.ResponseWriter, req *http.Request) {
 	return
 }
 
-func HttpHandler() (handler http.Handler) {
-	modMux := mux.NewRouter()
-	modMux.HandleFunc("/dashboard", addSlash)
-	modMux.PathPrefix("/dashboard/").
-		Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler()))
+func HttpHandler(addr string) http.Handler {
+	r := mux.NewRouter()
+	r.HandleFunc("/dashboard", addSlash)
+	r.PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler()))
 
-	return modMux
+	// TODO: Use correct addr.
+	r.PathPrefix("/v2/lock").Handler(http.StripPrefix("/v2/lock", lock2.NewHandler(addr)))
+	return r
 }

+ 1 - 1
server/server.go

@@ -135,7 +135,7 @@ func (s *Server) installV2() {
 
 func (s *Server) installMod() {
 	r := s.router
-	r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler()))
+	r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler(s.url)))
 }
 
 // Adds a v1 server handler to the router.

+ 3 - 3
server/v2/tests/delete_handler_test.go

@@ -19,11 +19,11 @@ func TestV2DeleteKey(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 		v := url.Values{}
 		v.Set("value", "XXX")
-		resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
-		resp, err = tests.DeleteForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{})
+		resp, err = tests.DeleteForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), url.Values{})
 		body := tests.ReadBody(resp)
 		assert.Nil(t, err, "")
-		assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","prevValue":"XXX","modifiedIndex":2,"createdIndex":1}}`, "")
+		assert.Equal(t, string(body), `{"action":"delete","node":{"key":"/foo/bar","prevValue":"XXX","modifiedIndex":3,"createdIndex":2}}`, "")
 	})
 }

+ 14 - 16
server/v2/tests/get_handler_test.go

@@ -20,16 +20,15 @@ func TestV2GetKey(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 		v := url.Values{}
 		v.Set("value", "XXX")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
-		resp, _ = tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"))
+		resp, _ = tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"))
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["action"], "get", "")
-
 		node := body["node"].(map[string]interface{})
 		assert.Equal(t, node["key"], "/foo/bar", "")
 		assert.Equal(t, node["value"], "XXX", "")
-		assert.Equal(t, node["modifiedIndex"], 1, "")
+		assert.Equal(t, node["modifiedIndex"], 2, "")
 	})
 }
 
@@ -44,21 +43,20 @@ func TestV2GetKeyRecursively(t *testing.T) {
 		v := url.Values{}
 		v.Set("value", "XXX")
 		v.Set("ttl", "10")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/x"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/x"), v)
 		tests.ReadBody(resp)
 
 		v.Set("value", "YYY")
-		resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/y/z"), v)
+		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/y/z"), v)
 		tests.ReadBody(resp)
 
-		resp, _ = tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo?recursive=true"))
+		resp, _ = tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo?recursive=true"))
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["action"], "get", "")
-
 		node := body["node"].(map[string]interface{})
 		assert.Equal(t, node["key"], "/foo", "")
 		assert.Equal(t, node["dir"], true, "")
-		assert.Equal(t, node["modifiedIndex"], 1, "")
+		assert.Equal(t, node["modifiedIndex"], 2, "")
 		assert.Equal(t, len(node["nodes"].([]interface{})), 2, "")
 
 		node0 := node["nodes"].([]interface{})[0].(map[string]interface{})
@@ -86,7 +84,7 @@ func TestV2WatchKey(t *testing.T) {
 		var body map[string]interface{}
 		c := make(chan bool)
 		go func() {
-			resp, _ := tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?wait=true"))
+			resp, _ := tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?wait=true"))
 			body = tests.ReadBodyJSON(resp)
 			c <- true
 		}()
@@ -98,7 +96,7 @@ func TestV2WatchKey(t *testing.T) {
 		// Set a value.
 		v := url.Values{}
 		v.Set("value", "XXX")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
 
 		// A response should follow from the GET above.
@@ -117,7 +115,7 @@ func TestV2WatchKey(t *testing.T) {
 		node := body["node"].(map[string]interface{})
 		assert.Equal(t, node["key"], "/foo/bar", "")
 		assert.Equal(t, node["value"], "XXX", "")
-		assert.Equal(t, node["modifiedIndex"], 1, "")
+		assert.Equal(t, node["modifiedIndex"], 2, "")
 	})
 }
 
@@ -132,7 +130,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
 		var body map[string]interface{}
 		c := make(chan bool)
 		go func() {
-			resp, _ := tests.Get(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=2"))
+			resp, _ := tests.Get(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar?wait=true&waitIndex=3"))
 			body = tests.ReadBodyJSON(resp)
 			c <- true
 		}()
@@ -144,7 +142,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
 		// Set a value (before given index).
 		v := url.Values{}
 		v.Set("value", "XXX")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
 
 		// Make sure response didn't fire early.
@@ -153,7 +151,7 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
 
 		// Set a value (before given index).
 		v.Set("value", "YYY")
-		resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
 
 		// A response should follow from the GET above.
@@ -172,6 +170,6 @@ func TestV2WatchKeyWithIndex(t *testing.T) {
 		node := body["node"].(map[string]interface{})
 		assert.Equal(t, node["key"], "/foo/bar", "")
 		assert.Equal(t, node["value"], "YYY", "")
-		assert.Equal(t, node["modifiedIndex"], 2, "")
+		assert.Equal(t, node["modifiedIndex"], 3, "")
 	})
 }

+ 9 - 7
server/v2/tests/post_handler_test.go

@@ -18,25 +18,27 @@ import (
 func TestV2CreateUnique(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 		// POST should add index to list.
-		resp, _ := tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
+		resp, _ := tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["action"], "create", "")
 
 		node := body["node"].(map[string]interface{})
-		assert.Equal(t, node["key"], "/foo/bar/1", "")
+		assert.Equal(t, node["key"], "/foo/bar/2", "")
 		assert.Equal(t, node["dir"], true, "")
-		assert.Equal(t, node["modifiedIndex"], 1, "")
+		assert.Equal(t, node["modifiedIndex"], 2, "")
 
 		// Second POST should add next index to list.
-		resp, _ = tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
+		resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), nil)
 		body = tests.ReadBodyJSON(resp)
+
 		node = body["node"].(map[string]interface{})
-		assert.Equal(t, node["key"], "/foo/bar/2", "")
+		assert.Equal(t, node["key"], "/foo/bar/3", "")
 
 		// POST to a different key should add index to that list.
-		resp, _ = tests.PostForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/baz"), nil)
+		resp, _ = tests.PostForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/baz"), nil)
 		body = tests.ReadBodyJSON(resp)
+
 		node = body["node"].(map[string]interface{})
-		assert.Equal(t, node["key"], "/foo/baz/3", "")
+		assert.Equal(t, node["key"], "/foo/baz/4", "")
 	})
 }

+ 29 - 31
server/v2/tests/put_handler_test.go

@@ -19,10 +19,10 @@ func TestV2SetKey(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 		v := url.Values{}
 		v.Set("value", "XXX")
-		resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, err := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBody(resp)
 		assert.Nil(t, err, "")
-		assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":1,"createdIndex":1}}`, "")
+		assert.Equal(t, string(body), `{"action":"set","node":{"key":"/foo/bar","value":"XXX","modifiedIndex":2,"createdIndex":2}}`, "")
 	})
 }
 
@@ -36,7 +36,7 @@ func TestV2SetKeyWithTTL(t *testing.T) {
 		v := url.Values{}
 		v.Set("value", "XXX")
 		v.Set("ttl", "20")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		node := body["node"].(map[string]interface{})
 		assert.Equal(t, node["ttl"], 20, "")
@@ -56,7 +56,7 @@ func TestV2SetKeyWithBadTTL(t *testing.T) {
 		v := url.Values{}
 		v.Set("value", "XXX")
 		v.Set("ttl", "bad_ttl")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 202, "")
 		assert.Equal(t, body["message"], "The given TTL in POST form is not a number", "")
@@ -73,7 +73,7 @@ func TestV2CreateKeySuccess(t *testing.T) {
 		v := url.Values{}
 		v.Set("value", "XXX")
 		v.Set("prevExist", "false")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		node := body["node"].(map[string]interface{})
 		assert.Equal(t, node["value"], "XXX", "")
@@ -90,9 +90,9 @@ func TestV2CreateKeyFail(t *testing.T) {
 		v := url.Values{}
 		v.Set("value", "XXX")
 		v.Set("prevExist", "false")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
-		resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 105, "")
 		assert.Equal(t, body["message"], "Already exists", "")
@@ -110,12 +110,12 @@ func TestV2UpdateKeySuccess(t *testing.T) {
 		v := url.Values{}
 
 		v.Set("value", "XXX")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
 
 		v.Set("value", "YYY")
 		v.Set("prevExist", "true")
-		resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["action"], "update", "")
 
@@ -131,11 +131,11 @@ func TestV2UpdateKeySuccess(t *testing.T) {
 func TestV2UpdateKeyFailOnValue(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 		v := url.Values{}
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo"), v)
 
 		v.Set("value", "YYY")
 		v.Set("prevExist", "true")
-		resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 100, "")
 		assert.Equal(t, body["message"], "Key Not Found", "")
@@ -153,7 +153,7 @@ func TestV2UpdateKeyFailOnMissingDirectory(t *testing.T) {
 		v := url.Values{}
 		v.Set("value", "YYY")
 		v.Set("prevExist", "true")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 100, "")
 		assert.Equal(t, body["message"], "Key Not Found", "")
@@ -170,18 +170,17 @@ func TestV2SetKeyCASOnIndexSuccess(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 		v := url.Values{}
 		v.Set("value", "XXX")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
 		v.Set("value", "YYY")
-		v.Set("prevIndex", "1")
-		resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		v.Set("prevIndex", "2")
+		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["action"], "compareAndSwap", "")
-
 		node := body["node"].(map[string]interface{})
 		assert.Equal(t, node["prevValue"], "XXX", "")
 		assert.Equal(t, node["value"], "YYY", "")
-		assert.Equal(t, node["modifiedIndex"], 2, "")
+		assert.Equal(t, node["modifiedIndex"], 3, "")
 	})
 }
 
@@ -194,16 +193,16 @@ func TestV2SetKeyCASOnIndexFail(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 		v := url.Values{}
 		v.Set("value", "XXX")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
 		v.Set("value", "YYY")
 		v.Set("prevIndex", "10")
-		resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 101, "")
 		assert.Equal(t, body["message"], "Test Failed", "")
-		assert.Equal(t, body["cause"], "[ != XXX] [10 != 1]", "")
-		assert.Equal(t, body["index"], 1, "")
+		assert.Equal(t, body["cause"], "[ != XXX] [10 != 2]", "")
+		assert.Equal(t, body["index"], 2, "")
 	})
 }
 
@@ -216,7 +215,7 @@ func TestV2SetKeyCASWithInvalidIndex(t *testing.T) {
 		v := url.Values{}
 		v.Set("value", "YYY")
 		v.Set("prevIndex", "bad_index")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 203, "")
 		assert.Equal(t, body["message"], "The given index in POST form is not a number", "")
@@ -233,18 +232,17 @@ func TestV2SetKeyCASOnValueSuccess(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 		v := url.Values{}
 		v.Set("value", "XXX")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
 		v.Set("value", "YYY")
 		v.Set("prevValue", "XXX")
-		resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["action"], "compareAndSwap", "")
-
 		node := body["node"].(map[string]interface{})
 		assert.Equal(t, node["prevValue"], "XXX", "")
 		assert.Equal(t, node["value"], "YYY", "")
-		assert.Equal(t, node["modifiedIndex"], 2, "")
+		assert.Equal(t, node["modifiedIndex"], 3, "")
 	})
 }
 
@@ -257,16 +255,16 @@ func TestV2SetKeyCASOnValueFail(t *testing.T) {
 	tests.RunServer(func(s *server.Server) {
 		v := url.Values{}
 		v.Set("value", "XXX")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		tests.ReadBody(resp)
 		v.Set("value", "YYY")
 		v.Set("prevValue", "AAA")
-		resp, _ = tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ = tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 101, "")
 		assert.Equal(t, body["message"], "Test Failed", "")
-		assert.Equal(t, body["cause"], "[AAA != XXX] [0 != 1]", "")
-		assert.Equal(t, body["index"], 1, "")
+		assert.Equal(t, body["cause"], "[AAA != XXX] [0 != 2]", "")
+		assert.Equal(t, body["index"], 2, "")
 	})
 }
 
@@ -279,7 +277,7 @@ func TestV2SetKeyCASWithMissingValueFails(t *testing.T) {
 		v := url.Values{}
 		v.Set("value", "XXX")
 		v.Set("prevValue", "")
-		resp, _ := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/v2/keys/foo/bar"), v)
+		resp, _ := tests.PutForm(fmt.Sprintf("%s%s", s.URL(), "/v2/keys/foo/bar"), v)
 		body := tests.ReadBodyJSON(resp)
 		assert.Equal(t, body["errorCode"], 201, "")
 		assert.Equal(t, body["message"], "PrevValue is Required in POST form", "")

+ 9 - 8
test.sh

@@ -1,6 +1,10 @@
 #!/bin/sh
 set -e
 
+if [ -z "$PKG" ]; then
+    PKG="./store ./server ./server/v2/tests ./mod/lock/v2/tests"
+fi
+
 # Get GOPATH, etc from build
 . ./build
 
@@ -8,14 +12,11 @@ set -e
 export GOPATH="${PWD}"
 
 # Unit tests
-go test -i ./server
-go test -v ./server
-
-go test -i ./server/v2/tests
-go test -v ./server/v2/tests
-
-go test -i ./store
-go test -v ./store
+for i in $PKG
+do
+    go test -i $i
+    go test -v $i
+done
 
 # Functional tests
 go test -i ./tests/functional

+ 3 - 2
tests/server_utils.go

@@ -23,8 +23,9 @@ func RunServer(f func(*server.Server)) {
 
 	store := store.New()
 	registry := server.NewRegistry(store)
-	ps := server.NewPeerServer(testName, path, testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount)
-	s := server.New(testName, testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store)
+	ps := server.NewPeerServer(testName, path, "http://" + testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount)
+	ps.MaxClusterSize = 9
+	s := server.New(testName, "http://" + testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store)
 	ps.SetServer(s)
 
 	// Start up peer server.