Browse Source

use url package to parse url; fix commented codes

Xiang Li 12 years ago
parent
commit
d0e9449ba2
6 changed files with 90 additions and 21 deletions
  1. 14 0
      client_handlers.go
  2. 16 9
      etcd.go
  3. 20 0
      etcd_long_test.go
  4. 6 0
      store/store.go
  5. 12 12
      store/tree_store_test.go
  6. 22 0
      test.go

+ 14 - 0
client_handlers.go

@@ -315,6 +315,20 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 }
 
+// TestHandler
+func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
+	testType := req.URL.Path[len("/test/"):]
+
+	if testType == "speed" {
+		directSet()
+		w.WriteHeader(http.StatusOK)
+		w.Write([]byte("speed test success"))
+		return
+	}
+
+	w.WriteHeader(http.StatusBadRequest)
+}
+
 // Convert string duration to time format
 func durationToExpireTime(strDuration string) (time.Time, error) {
 	if strDuration != "" {

+ 16 - 9
etcd.go

@@ -14,8 +14,10 @@ import (
 	"io/ioutil"
 	"net"
 	"net/http"
+	"net/url"
 	"os"
 	"os/signal"
+	"path"
 	"runtime/pprof"
 	"strings"
 	"time"
@@ -267,9 +269,6 @@ func startRaft(securityType int) {
 
 	raftServer.Start()
 
-	// start to response to raft requests
-	go startRaftTransport(info.RaftPort, securityType)
-
 	if raftServer.IsLogEmpty() {
 
 		// start as a leader in a new cluster
@@ -339,6 +338,9 @@ func startRaft(securityType int) {
 		go raftServer.Snapshot()
 	}
 
+	// start to response to raft requests
+	go startRaftTransport(info.RaftPort, securityType)
+
 }
 
 // Create transporter using by raft server
@@ -436,6 +438,7 @@ func startClientTransport(port int, st int) {
 	http.HandleFunc("/machines", MachinesHttpHandler)
 	http.HandleFunc("/", VersionHttpHandler)
 	http.HandleFunc("/stats", StatsHttpHandler)
+	http.HandleFunc("/test/", TestHttpHandler)
 
 	switch st {
 
@@ -624,15 +627,19 @@ func joinCluster(s *raft.Server, serverName string) error {
 				return nil
 			}
 			if resp.StatusCode == http.StatusTemporaryRedirect {
+
 				address := resp.Header.Get("Location")
 				debugf("Send Join Request to %s", address)
-				json.NewEncoder(&b).Encode(command)
-				segs := strings.Split(address, "://")
-				if len(segs) != 2 {
-					return fmt.Errorf("Unable to join: wrong redirection info")
+				u, err := url.Parse(address)
+
+				if err != nil {
+					return fmt.Errorf("Unable to join: %s", err.Error())
 				}
-				path := segs[1]
-				resp, err = t.Post(path, &b)
+
+				json.NewEncoder(&b).Encode(command)
+
+				resp, err = t.Post(path.Join(u.Host, u.Path), &b)
+
 			} else if resp.StatusCode == http.StatusBadRequest {
 				debug("Reach max number machines in the cluster")
 				return fmt.Errorf(errors[103])

+ 20 - 0
etcd_long_test.go

@@ -3,6 +3,7 @@ package main
 import (
 	"fmt"
 	"math/rand"
+	"net/http"
 	"os"
 	"strconv"
 	"strings"
@@ -119,3 +120,22 @@ func TestKillRandom(t *testing.T) {
 	<-leaderChan
 
 }
+
+func BenchmarkEtcdDirectCall(b *testing.B) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 3
+	_, etcds, _ := createCluster(clusterSize, procAttr)
+
+	defer destroyCluster(etcds)
+
+	time.Sleep(time.Second)
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		resp, _ := http.Get("http://0.0.0.0:4001/test/speed")
+		resp.Body.Close()
+	}
+
+}

+ 6 - 0
store/store.go

@@ -21,6 +21,12 @@ type Store struct {
 	// key-value store structure
 	Tree *tree
 
+	// This mutex protects everything except add watcher member.
+	// Add watch member does not depend on the current state of the store.
+	// And watch will return when other protected function is called and reach
+	// the watching condition.
+	// It is needed so that clone() can atomically replicate the Store
+	// and do the log snapshot in a go routine.
 	mutex sync.Mutex
 
 	// WatcherHub is where we register all the clients

+ 12 - 12
store/tree_store_test.go

@@ -1,7 +1,7 @@
 package store
 
 import (
-	//"fmt"
+	"fmt"
 	"math/rand"
 	"strconv"
 	"testing"
@@ -64,20 +64,20 @@ func TestStoreGet(t *testing.T) {
 	ts.set("/hello/fooo", NewTestNode("barbarbar"))
 	ts.set("/hello/foooo/foo", NewTestNode("barbarbar"))
 
-	//nodes, keys, ok := ts.list("/hello")
+	nodes, keys, ok := ts.list("/hello")
 
-	// if !ok {
-	// 	t.Fatalf("cannot list!")
-	// } else {
-	// 	nodes, _ := nodes.([]*Node)
-	// 	length := len(nodes)
+	if !ok {
+		t.Fatalf("cannot list!")
+	} else {
+		nodes, _ := nodes.([]*Node)
+		length := len(nodes)
 
-	// 	for i := 0; i < length; i++ {
-	// 		fmt.Println(keys[i], "=", nodes[i].Value)
-	// 	}
-	// }
+		for i := 0; i < length; i++ {
+			fmt.Println(keys[i], "=", nodes[i].Value)
+		}
+	}
 
-	keys := GenKeys(100, 10)
+	keys = GenKeys(100, 10)
 
 	for i := 0; i < 100; i++ {
 		value := strconv.Itoa(rand.Int())

+ 22 - 0
test.go

@@ -166,6 +166,28 @@ func getLeader(addr string) (string, error) {
 
 }
 
+func directSet() {
+	c := make(chan bool, 1000)
+	for i := 0; i < 1000; i++ {
+		go send(c)
+	}
+
+	for i := 0; i < 1000; i++ {
+		<-c
+	}
+}
+
+func send(c chan bool) {
+	for i := 0; i < 10; i++ {
+		command := &SetCommand{}
+		command.Key = "foo"
+		command.Value = "bar"
+		command.ExpireTime = time.Unix(0, 0)
+		raftServer.Do(command)
+	}
+	c <- true
+}
+
 // Dial with timeout
 func dialTimeoutFast(network, addr string) (net.Conn, error) {
 	return net.DialTimeout(network, addr, time.Millisecond*10)