Prechádzať zdrojové kódy

Initial mod_lock acquire.

Ben Johnson 12 rokov pred
rodič
commit
22c2935ddb

+ 45 - 19
mod/lock/acquire_handler.go

@@ -4,46 +4,72 @@ import (
 	"net/http"
 	"path"
 	"strconv"
+	"time"
 
 	"github.com/gorilla/mux"
 )
 
 // acquireHandler attempts to acquire a lock on the given key.
 func (h *handler) acquireHandler(w http.ResponseWriter, req *http.Request) {
+	h.client.SyncCluster()
+
 	vars := mux.Vars(req)
 	keypath := path.Join(prefix, vars["key"])
-	ttl, err := strconv.Atoi(vars["ttl"])
+	ttl, err := strconv.Atoi(req.FormValue("ttl"))
 	if err != nil {
 		http.Error(w, "invalid ttl: " + err.Error(), http.StatusInternalServerError)
 		return
 	}
 
 	// Create an incrementing id for the lock.
-	resp, err := h.client.AddChild(keypath, "X", ttl)
+	resp, err := h.client.AddChild(keypath, "-", uint64(ttl))
 	if err != nil {
 		http.Error(w, "add lock index error: " + err.Error(), http.StatusInternalServerError)
 		return
 	}
 
+	// Keep updating TTL to make sure lock request is not expired before acquisition.
+	stopChan := make(chan bool)
+	defer close(stopChan)
+	go func(k string) {
+		stopped := false
+		for {
+			select {
+			case <-time.After(time.Duration(ttl / 2) * time.Second):
+			case <-stopChan:
+				stopped = true
+			}
+			h.client.Update(k, "-", uint64(ttl))
+			if stopped {
+				break
+			}
+		}
+	}(resp.Key)
+
 	// Extract the lock index.
 	index, _ := strconv.Atoi(path.Base(resp.Key))
 
-	// Read all indices.
-	resp, err = h.client.GetAll(key)
-	if err != nil {
-		http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
-		return
+	for {
+		// Read all indices.
+		resp, err = h.client.GetAll(keypath, true)
+		if err != nil {
+			http.Error(w, "lock children lookup error: " + err.Error(), http.StatusInternalServerError)
+			return
+		}
+		indices := extractResponseIndices(resp)
+		waitIndex := resp.ModifiedIndex
+		prevIndex := findPrevIndex(indices, index)
+
+		// If there is no previous index then we have the lock.
+		if prevIndex == 0 {
+			break
+		}
+
+		// Otherwise watch previous index until it's gone.
+		_, err = h.client.Watch(path.Join(keypath, strconv.Itoa(prevIndex)), waitIndex, nil, nil)
+		if err != nil {
+			http.Error(w, "lock watch error: " + err.Error(), http.StatusInternalServerError)
+			return
+		}
 	}
-	indices := extractResponseIndices(resp)
-
-	// TODO: child_keys := parse_and_sort_child_keys
-	// TODO: if index == min(child_keys) then return 200
-	// TODO: else:
-	// TODO: h.client.WatchAll(key)
-	// TODO: if next_lowest_key is deleted
-	// TODO: get_all_keys
-	// TODO: if index == min(child_keys) then return 200
-	// TODO: rinse_and_repeat until we're the lowest.
-
-	// TODO: 
 }

+ 5 - 2
mod/lock/handler.go

@@ -1,9 +1,10 @@
 package lock
 
 import (
-	"fmt"
 	"net/http"
 	"path"
+	"strconv"
+	"sort"
 
 	"github.com/gorilla/mux"
 	"github.com/coreos/go-etcd/etcd"
@@ -19,6 +20,7 @@ type handler struct {
 
 // NewHandler creates an HTTP handler that can be registered on a router.
 func NewHandler(addr string) (http.Handler) {
+	etcd.OpenDebug()
 	h := &handler{
 		Router: mux.NewRouter(),
 		client: etcd.NewClient([]string{addr}),
@@ -36,9 +38,10 @@ func extractResponseIndices(resp *etcd.Response) []int {
 	var indices []int
 	for _, kv := range resp.Kvs {
 		if index, _ := strconv.Atoi(path.Base(kv.Key)); index > 0 {
-			indicies = append(indices, index)
+			indices = append(indices, index)
 		}
 	}
+	sort.Ints(indices)
 	return indices
 }
 

+ 5 - 3
mod/lock/renew_handler.go

@@ -2,15 +2,17 @@ package lock
 
 import (
 	"net/http"
+	_ "path"
+
+	_ "github.com/gorilla/mux"
 )
 
 // renewLockHandler attempts to update the TTL on an existing lock.
 // Returns a 200 OK if successful. Otherwie 
 func (h *handler) renewLockHandler(w http.ResponseWriter, req *http.Request) {
+	/*
 	vars := mux.Vars(req)
 	key := path.Join(prefix, vars["key"])
 	ttl := vars["ttl"]
-	w.Write([]byte(fmt.Sprintf("%s-%s", key, ttl)))
-
-	// TODO:
+	*/
 }

+ 2 - 2
mod/lock/tests/handler_test.go

@@ -16,12 +16,12 @@ func TestModLockAcquire(t *testing.T) {
 	v := url.Values{}
 	tests.RunServer(func(s *server.Server) {
 		// Acquire lock.
-		resp, err := tests.PutForm(fmt.Sprintf("http://%s%s", s.URL(), "/mod/lock"), v)
+		url := fmt.Sprintf("http://%s%s", s.URL(), "/mod/lock/foo?ttl=2")
+		resp, err := tests.PutForm(url, v)
 		assert.NoError(t, err)
 		ret := tests.ReadBody(resp)
 		assert.Equal(t, string(ret), "XXX")
 
-		fmt.Println("URL:", fmt.Sprintf("http://%s%s", s.URL(), "/mod/lock"))
 		time.Sleep(60 * time.Second)
 		// TODO: Check that it has been acquired.
 		// TODO: Release lock.

+ 2 - 3
mod/mod.go

@@ -17,13 +17,12 @@ func addSlash(w http.ResponseWriter, req *http.Request) {
 	return
 }
 
-func HttpHandler() (handler http.Handler) {
+func HttpHandler(addr string) http.Handler {
 	r := mux.NewRouter()
 	r.HandleFunc("/dashboard", addSlash)
 	r.PathPrefix("/dashboard/").Handler(http.StripPrefix("/dashboard/", dashboard.HttpHandler()))
 
 	// TODO: Use correct addr.
-	r.HandleFunc("/lock", addSlash)
-	r.PathPrefix("/lock").Handler(http.StripPrefix("/lock", lock.NewHandler("127.0.0.1:4001")))
+	r.PathPrefix("/lock").Handler(http.StripPrefix("/lock", lock.NewHandler(addr)))
 	return r
 }

+ 1 - 0
server/registry.go

@@ -46,6 +46,7 @@ func (r *Registry) Register(name string, peerURL string, url string) error {
 	key := path.Join(RegistryKey, name)
 	value := fmt.Sprintf("raft=%s&etcd=%s", peerURL, url)
 	_, err := r.store.Create(key, value, false, store.Permanent)
+	fmt.Println("register.1:", key, value, err)
 	log.Debugf("Register: %s", name)
 	return err
 }

+ 3 - 1
server/server.go

@@ -130,7 +130,7 @@ func (s *Server) installV2() {
 
 func (s *Server) installMod() {
 	r := s.router
-	r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler()))
+	r.PathPrefix("/mod").Handler(http.StripPrefix("/mod", mod.HttpHandler(s.url)))
 }
 
 // Adds a v1 server handler to the router.
@@ -320,12 +320,14 @@ func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) err
 // Handler to return the current leader's raft address
 func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error {
 	leader := s.peerServer.RaftServer().Leader()
+	fmt.Println("/leader.1?", leader)
 	if leader == "" {
 		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", s.Store().Index())
 	}
 	w.WriteHeader(http.StatusOK)
 	url, _ := s.registry.PeerURL(leader)
 	w.Write([]byte(url))
+	fmt.Println("/leader.2?", leader, url)
 	return nil
 }
 

+ 8 - 8
test.sh

@@ -1,6 +1,9 @@
 #!/bin/sh
 set -e
 
+PKGS="./mod/lock/tests"
+# PKGS="./store ./server ./server/v2/tests"
+
 # Get GOPATH, etc from build
 . ./build
 
@@ -8,14 +11,11 @@ set -e
 export GOPATH="${PWD}"
 
 # Unit tests
-go test -i ./server
-go test -v ./server
-
-go test -i ./server/v2/tests
-go test -v ./server/v2/tests
-
-go test -i ./store
-go test -v ./store
+for PKG in $PKGS
+do
+    go test -i $PKG
+    go test -v $PKG
+done
 
 # Functional tests
 go test -i ./tests/functional

+ 3 - 2
tests/server_utils.go

@@ -23,8 +23,9 @@ func RunServer(f func(*server.Server)) {
 
 	store := store.New()
 	registry := server.NewRegistry(store)
-	ps := server.NewPeerServer(testName, path, testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount)
-	s := server.New(testName, testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store)
+	ps := server.NewPeerServer(testName, path, "http://" + testRaftURL, testRaftURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, registry, store, testSnapshotCount)
+	ps.MaxClusterSize = 9
+	s := server.New(testName, "http://" + testClientURL, testClientURL, &server.TLSConfig{Scheme: "http"}, &server.TLSInfo{}, ps, registry, store)
 	ps.SetServer(s)
 
 	// Start up peer server.

+ 1 - 1
third_party/github.com/coreos/go-etcd/etcd/requests.go

@@ -207,7 +207,7 @@ func (c *Client) sendRequest(method string, _path string, values url.Values) (*R
 		if err != nil {
 			retry++
 			if retry > 2*len(c.cluster.Machines) {
-				return nil, errors.New("Cannot reach servers")
+				return nil, errors.New("Cannot reach servers" + err.Error())
 			}
 			num := retry % len(c.cluster.Machines)
 			logger.Debug("update.leader[", c.cluster.Leader, ",", c.cluster.Machines[num], "]")