Browse Source

mod/leader

Ben Johnson 12 years ago
parent
commit
61227d7477

+ 49 - 0
mod/leader/v2/delete_handler.go

@@ -0,0 +1,49 @@
+package v2
+
+import (
+	"fmt"
+	"io"
+	"net/http"
+	"net/url"
+
+	"github.com/gorilla/mux"
+)
+
+// deleteHandler remove a given leader leader.
+func (h *handler) deleteHandler(w http.ResponseWriter, req *http.Request) {
+	vars := mux.Vars(req)
+	name := req.FormValue("name")
+	if name == "" {
+		http.Error(w, "leader name required", http.StatusInternalServerError)
+		return
+	}
+
+	// Proxy the request to the the lock service.
+	u, err := url.Parse(fmt.Sprintf("%s/mod/v2/lock/%s", h.addr, vars["key"]))
+	if err != nil {
+		http.Error(w, err.Error(), http.StatusInternalServerError)
+		return
+	}
+	q := u.Query()
+	q.Set("value", name)
+	u.RawQuery = q.Encode()
+
+	r, err := http.NewRequest("DELETE", u.String(), nil)
+	if err != nil {
+		http.Error(w, err.Error(), http.StatusInternalServerError)
+		return
+	}
+
+	// Read from the leader lock.
+	resp, err := h.client.Do(r)
+	if err != nil {
+		http.Error(w, "delete leader http error: " + err.Error(), http.StatusInternalServerError)
+		return
+	}
+	defer resp.Body.Close()
+	w.WriteHeader(resp.StatusCode)
+	if resp.StatusCode != http.StatusOK {
+		w.Write([]byte("delete leader error: "))
+	}
+	io.Copy(w, resp.Body)
+}

+ 29 - 0
mod/leader/v2/get_handler.go

@@ -0,0 +1,29 @@
+package v2
+
+import (
+	"fmt"
+	"io"
+	"net/http"
+
+	"github.com/gorilla/mux"
+)
+
+// getHandler retrieves the current leader.
+func (h *handler) getHandler(w http.ResponseWriter, req *http.Request) {
+	vars := mux.Vars(req)
+
+	// Proxy the request to the lock service.
+	url := fmt.Sprintf("%s/mod/v2/lock/%s?field=value", h.addr, vars["key"])
+	resp, err := h.client.Get(url)
+	if err != nil {
+		http.Error(w, "read leader error: " + err.Error(), http.StatusInternalServerError)
+		return
+	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != http.StatusOK {
+		w.Write([]byte("get leader error: "))
+	}
+	w.WriteHeader(resp.StatusCode)
+	io.Copy(w, resp.Body)
+}

+ 34 - 0
mod/leader/v2/handler.go

@@ -0,0 +1,34 @@
+package v2
+
+import (
+	"net/http"
+
+	"github.com/gorilla/mux"
+)
+
+// prefix is appended to the lock's prefix since the leader mod uses the lock mod.
+const prefix = "/_mod/leader"
+
+// handler manages the leader HTTP request.
+type handler struct {
+	*mux.Router
+	client *http.Client
+	transport *http.Transport
+	addr string
+}
+
+// NewHandler creates an HTTP handler that can be registered on a router.
+func NewHandler(addr string) (http.Handler) {
+	transport := &http.Transport{DisableKeepAlives: false}
+	h := &handler{
+		Router: mux.NewRouter(),
+		client: &http.Client{Transport: transport},
+		transport: transport,
+		addr: addr,
+	}
+	h.StrictSlash(false)
+	h.HandleFunc("/{key:.*}", h.getHandler).Methods("GET")
+	h.HandleFunc("/{key:.*}", h.setHandler).Methods("PUT")
+	h.HandleFunc("/{key:.*}", h.deleteHandler).Methods("DELETE")
+	return h
+}

+ 63 - 0
mod/leader/v2/set_handler.go

@@ -0,0 +1,63 @@
+package v2
+
+import (
+	"fmt"
+	"io"
+	"net/http"
+	"net/url"
+
+	"github.com/gorilla/mux"
+)
+
+// setHandler attempts to set the current leader.
+func (h *handler) setHandler(w http.ResponseWriter, req *http.Request) {
+	vars := mux.Vars(req)
+	name := req.FormValue("name")
+	if name == "" {
+		http.Error(w, "leader name required", http.StatusInternalServerError)
+		return
+	}
+
+	// Proxy the request to the the lock service.
+	u, err := url.Parse(fmt.Sprintf("%s/mod/v2/lock/%s", h.addr, vars["key"]))
+	if err != nil {
+		http.Error(w, err.Error(), http.StatusInternalServerError)
+		return
+	}
+	q := u.Query()
+	q.Set("value", name)
+	q.Set("ttl", req.FormValue("ttl"))
+	q.Set("timeout", req.FormValue("timeout"))
+	u.RawQuery = q.Encode()
+
+	r, err := http.NewRequest("POST", u.String(), nil)
+	if err != nil {
+		http.Error(w, err.Error(), http.StatusInternalServerError)
+		return
+	}
+
+	// Close request if this connection disconnects.
+	closeNotifier, _ := w.(http.CloseNotifier)
+	stopChan := make(chan bool)
+	defer close(stopChan)
+	go func() {
+		select {
+		case <-closeNotifier.CloseNotify():
+			h.transport.CancelRequest(r)
+		case <-stopChan:
+		}
+	}()
+
+	// Read from the leader lock.
+	resp, err := h.client.Do(r)
+	if err != nil {
+		http.Error(w, "set leader http error: " + err.Error(), http.StatusInternalServerError)
+		return
+	}
+	defer resp.Body.Close()
+	w.WriteHeader(resp.StatusCode)
+	if resp.StatusCode != http.StatusOK {
+		w.Write([]byte("set leader error: "))
+	}
+	io.Copy(w, resp.Body)
+}

+ 80 - 0
mod/leader/v2/tests/mod_leader_test.go

@@ -0,0 +1,80 @@
+package leader
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/server"
+	"github.com/coreos/etcd/tests"
+	"github.com/stretchr/testify/assert"
+)
+
+// Ensure that a leader can be set and read.
+func TestModLeaderSet(t *testing.T) {
+	tests.RunServer(func(s *server.Server) {
+		// Set leader.
+		body, err := testSetLeader(s, "foo", "xxx", 10)
+		assert.NoError(t, err)
+		assert.Equal(t, body, "2")
+
+		// Check that the leader is set.
+		body, err = testGetLeader(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "xxx")
+
+		// Delete leader.
+		body, err = testDeleteLeader(s, "foo", "xxx")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "")
+
+		// Check that the leader is removed.
+		body, err = testGetLeader(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "")
+	})
+}
+
+// Ensure that a leader can be renewed.
+func TestModLeaderRenew(t *testing.T) {
+	tests.RunServer(func(s *server.Server) {
+		// Set leader.
+		body, err := testSetLeader(s, "foo", "xxx", 2)
+		assert.NoError(t, err)
+		assert.Equal(t, body, "2")
+
+		time.Sleep(1 * time.Second)
+
+		// Renew leader.
+		body, err = testSetLeader(s, "foo", "xxx", 3)
+		assert.NoError(t, err)
+		assert.Equal(t, body, "2")
+
+		time.Sleep(2 * time.Second)
+
+		// Check that the leader is set.
+		body, err = testGetLeader(s, "foo")
+		assert.NoError(t, err)
+		assert.Equal(t, body, "xxx")
+	})
+}
+
+
+
+func testSetLeader(s *server.Server, key string, name string, ttl int) (string, error) {
+	resp, err := tests.PutForm(fmt.Sprintf("%s/mod/v2/leader/%s?name=%s&ttl=%d", s.URL(), key, name, ttl), nil)
+	ret := tests.ReadBody(resp)
+	return string(ret), err
+}
+
+func testGetLeader(s *server.Server, key string) (string, error) {
+	resp, err := tests.Get(fmt.Sprintf("%s/mod/v2/leader/%s", s.URL(), key))
+	ret := tests.ReadBody(resp)
+	return string(ret), err
+}
+
+func testDeleteLeader(s *server.Server, key string, name string) (string, error) {
+	resp, err := tests.DeleteForm(fmt.Sprintf("%s/mod/v2/leader/%s?name=%s", s.URL(), key, name), nil)
+	ret := tests.ReadBody(resp)
+	return string(ret), err
+}

+ 12 - 6
mod/lock/v2/acquire_handler.go

@@ -49,9 +49,15 @@ func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
 	}
 	}
 
 
 	// If node exists then just watch it. Otherwise create the node and watch it.
 	// If node exists then just watch it. Otherwise create the node and watch it.
-	index := h.findExistingNode(keypath, value)
+	node, index, pos := h.findExistingNode(keypath, value)
 	if index > 0 {
 	if index > 0 {
-		err = h.watch(keypath, index, nil)
+		if pos == 0 {
+			// If lock is already acquired then update the TTL.
+			h.client.Update(node.Key, node.Value, uint64(ttl))
+		} else {
+			// Otherwise watch until it becomes acquired (or errors).
+			err = h.watch(keypath, index, nil)
+		}
 	} else {
 	} else {
 		index, err = h.createNode(keypath, value, ttl, closeChan, stopChan)
 		index, err = h.createNode(keypath, value, ttl, closeChan, stopChan)
 	}
 	}
@@ -108,18 +114,18 @@ func (h *handler) createNode(keypath string, value string, ttl int, closeChan <-
 }
 }
 
 
 // findExistingNode search for a node on the lock with the given value.
 // findExistingNode search for a node on the lock with the given value.
-func (h *handler) findExistingNode(keypath string, value string) int {
+func (h *handler) findExistingNode(keypath string, value string) (*etcd.Node, int, int) {
 	if len(value) > 0 {
 	if len(value) > 0 {
 		resp, err := h.client.Get(keypath, true, true)
 		resp, err := h.client.Get(keypath, true, true)
 		if err == nil {
 		if err == nil {
 			nodes := lockNodes{resp.Node.Nodes}
 			nodes := lockNodes{resp.Node.Nodes}
-			if node := nodes.FindByValue(value); node != nil {
+			if node, pos := nodes.FindByValue(value); node != nil {
 				index, _ := strconv.Atoi(path.Base(node.Key))
 				index, _ := strconv.Atoi(path.Base(node.Key))
-				return index
+				return node, index, pos
 			}
 			}
 		}
 		}
 	}
 	}
-	return 0
+	return nil, 0, 0
 }
 }
 
 
 // ttlKeepAlive continues to update a key's TTL until the stop channel is closed.
 // ttlKeepAlive continues to update a key's TTL until the stop channel is closed.

+ 4 - 4
mod/lock/v2/lock_nodes.go

@@ -30,15 +30,15 @@ func (s lockNodes) First() *etcd.Node {
 }
 }
 
 
 // Retrieves the first node with a given value.
 // Retrieves the first node with a given value.
-func (s lockNodes) FindByValue(value string) *etcd.Node {
+func (s lockNodes) FindByValue(value string) (*etcd.Node, int) {
 	sort.Sort(s)
 	sort.Sort(s)
 
 
-	for _, node := range s.Nodes {
+	for i, node := range s.Nodes {
 		if node.Value == value {
 		if node.Value == value {
-			return &node
+			return &node, i
 		}
 		}
 	}
 	}
-	return nil
+	return nil, 0
 }
 }
 
 
 // Retrieves the index that occurs before a given index.
 // Retrieves the index that occurs before a given index.

+ 1 - 1
mod/lock/v2/release_handler.go

@@ -33,7 +33,7 @@ func (h *handler) releaseLockHandler(w http.ResponseWriter, req *http.Request) {
 			return
 			return
 		}
 		}
 		nodes := lockNodes{resp.Node.Nodes}
 		nodes := lockNodes{resp.Node.Nodes}
-		node := nodes.FindByValue(value)
+		node, _ := nodes.FindByValue(value)
 		if node == nil {
 		if node == nil {
 			http.Error(w, "release lock error: cannot find: " + value, http.StatusInternalServerError)
 			http.Error(w, "release lock error: cannot find: " + value, http.StatusInternalServerError)
 			return
 			return

+ 1 - 1
mod/lock/v2/renew_handler.go

@@ -41,7 +41,7 @@ func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) {
 			return
 			return
 		}
 		}
 		nodes := lockNodes{resp.Node.Nodes}
 		nodes := lockNodes{resp.Node.Nodes}
-		node := nodes.FindByValue(value)
+		node, _ := nodes.FindByValue(value)
 		if node == nil {
 		if node == nil {
 			http.Error(w, "renew lock error: cannot find: " + value, http.StatusInternalServerError)
 			http.Error(w, "renew lock error: cannot find: " + value, http.StatusInternalServerError)
 			return
 			return

+ 2 - 1
mod/mod.go

@@ -7,6 +7,7 @@ import (
 
 
 	"github.com/coreos/etcd/mod/dashboard"
 	"github.com/coreos/etcd/mod/dashboard"
 	lock2 "github.com/coreos/etcd/mod/lock/v2"
 	lock2 "github.com/coreos/etcd/mod/lock/v2"
+	leader2 "github.com/coreos/etcd/mod/leader/v2"
 	"github.com/gorilla/mux"
 	"github.com/gorilla/mux"
 )
 )
 
 
@@ -22,7 +23,7 @@ func HttpHandler(addr string) http.Handler {
 	r.HandleFunc("/dashboard", addSlash)
 	r.HandleFunc("/dashboard", addSlash)
 	r.PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler()))
 	r.PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler()))
 
 
-	// TODO: Use correct addr.
 	r.PathPrefix("/v2/lock").Handler(http.StripPrefix("/v2/lock", lock2.NewHandler(addr)))
 	r.PathPrefix("/v2/lock").Handler(http.StripPrefix("/v2/lock", lock2.NewHandler(addr)))
+	r.PathPrefix("/v2/leader").Handler(http.StripPrefix("/v2/leader", leader2.NewHandler(addr)))
 	return r
 	return r
 }
 }