Browse Source

merge master

Xiang Li 12 years ago
parent
commit
44e8c234ed
10 changed files with 238 additions and 44 deletions
  1. 26 6
      README.md
  2. 39 4
      etcd.go
  3. 25 6
      etcd_handlers.go
  4. 2 9
      etcd_server.go
  5. 26 0
      etcd_test.go
  6. 8 7
      raft_server.go
  7. 1 1
      raft_stats.go
  8. 54 11
      transporter.go
  9. 36 0
      transporter_test.go
  10. 21 0
      util.go

+ 26 - 6
README.md

@@ -1,4 +1,5 @@
 # etcd
+README version 0.1.0
 
 [![Build Status](https://travis-ci.org/coreos/etcd.png)](https://travis-ci.org/coreos/etcd)
 
@@ -272,7 +273,7 @@ Next, lets configure etcd to use this keypair:
 You can now test the configuration using https:
 
 ```sh
-curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -F value=bar
+curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -d value=bar -v
 ```
 
 You should be able to see the handshake succeed.
@@ -302,7 +303,7 @@ We can also do authentication using CA certs. The clients will provide their cer
 Try the same request to this server:
 
 ```sh
-curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -F value=bar
+curl --cacert fixtures/ca/ca.crt https://127.0.0.1:4001/v1/keys/foo -d value=bar -v
 ```
 
 The request should be rejected by the server.
@@ -347,6 +348,9 @@ We use -s to specify server port and -c to specify client port and -d to specify
 ./etcd -s 127.0.0.1:7001 -c 127.0.0.1:4001 -d nodes/node1 -n node1
 ```
 
+**Note:** If you want to run etcd on external IP address and still have access locally you need to add `-cl 0.0.0.0` so that it will listen on both external and localhost addresses.
+A similar argument `-sl` is used to setup the listening address for the server port.
+
 Let the join two more nodes to this cluster using the -C argument:
 
 ```sh
@@ -363,7 +367,7 @@ curl -L http://127.0.0.1:4001/v1/machines
 We should see there are three nodes in the cluster
 
 ```
-0.0.0.0:4001,0.0.0.0:4002,0.0.0.0:4003
+http://127.0.0.1:4001, http://127.0.0.1:4002, http://127.0.0.1:4003
 ```
 
 The machine list is also available via this API:
@@ -373,7 +377,7 @@ curl -L http://127.0.0.1:4001/v1/keys/_etcd/machines
 ```
 
 ```json
-[{"action":"GET","key":"/machines/node1","value":"0.0.0.0,7001,4001","index":4},{"action":"GET","key":"/machines/node3","value":"0.0.0.0,7002,4002","index":4},{"action":"GET","key":"/machines/node4","value":"0.0.0.0,7003,4003","index":4}]
+[{"action":"GET","key":"/_etcd/machines/node1","value":"raft=http://127.0.0.1:7001&etcd=http://127.0.0.1:4001","index":4},{"action":"GET","key":"/_etcd/machines/node2","value":"raft=http://127.0.0.1:7002&etcd=http://127.0.0.1:4002","index":4},{"action":"GET","key":"/_etcd/machines/node3","value":"raft=http://127.0.0.1:7003&etcd=http://127.0.0.1:4003","index":4}]
 ```
 
 The key of the machine is based on the ```commit index``` when it was added. The value of the machine is ```hostname```, ```raft port``` and ```client port```.
@@ -386,7 +390,7 @@ curl -L http://127.0.0.1:4001/v1/leader
 The first server we set up should be the leader, if it has not dead during these commands.
 
 ```
-0.0.0.0:7001
+http://127.0.0.1:7001
 ```
 
 Now we can do normal SET and GET operations on keys as we explored earlier.
@@ -414,7 +418,13 @@ curl -L http://127.0.0.1:4001/v1/leader
 ```
 
 ```
-0.0.0.0:7002 or 0.0.0.0:7003
+http://127.0.0.1:7002
+```
+
+or
+
+```
+http://127.0.0.1:7003
 ```
 
 You should be able to see this:
@@ -455,6 +465,16 @@ If you are using SSL for server to server communication, you must use it on all
 
 - [go-etcd](https://github.com/coreos/go-etcd)
 
+**Java libraries**
+
+- [justinsb/jetcd](https://github.com/justinsb/jetcd)
+- [diwakergupta/jetcd](https://github.com/diwakergupta/jetcd)
+
+
+**Python libraries**
+
+- [transitorykris/etcd-py](https://github.com/transitorykris/etcd-py)
+
 **Node libraries**
 
 - [stianeikeland/node-etcd](https://github.com/stianeikeland/node-etcd)

+ 39 - 4
etcd.go

@@ -3,7 +3,9 @@ package main
 import (
 	"crypto/tls"
 	"flag"
+	"fmt"
 	"io/ioutil"
+	"net/url"
 	"os"
 	"strings"
 	"time"
@@ -41,6 +43,9 @@ var (
 	maxClusterSize int
 
 	cpuprofile string
+
+	cors     string
+	corsList map[string]bool
 )
 
 func init() {
@@ -51,8 +56,10 @@ func init() {
 	flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
 
 	flag.StringVar(&argInfo.Name, "n", "default-name", "the node name (required)")
-	flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the hostname:port for etcd client communication")
-	flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the hostname:port for raft server communication")
+	flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the advertised public hostname:port for etcd client communication")
+	flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the advertised public hostname:port for raft server communication")
+	flag.StringVar(&argInfo.EtcdListenHost, "cl", "", "the listening hostname for etcd client communication (defaults to advertised ip)")
+	flag.StringVar(&argInfo.RaftListenHost, "sl", "", "the listening hostname for raft server communication (defaults to advertised ip)")
 	flag.StringVar(&argInfo.WebURL, "w", "", "the hostname:port of web interface")
 
 	flag.StringVar(&argInfo.RaftTLS.CAFile, "serverCAFile", "", "the path of the CAFile")
@@ -76,6 +83,8 @@ func init() {
 	flag.IntVar(&maxClusterSize, "maxsize", 9, "the max size of the cluster")
 
 	flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file")
+
+	flag.StringVar(&cors, "cors", "", "whitelist origins for cross-origin resource sharing (e.g. '*' or 'http://localhost:8001,etc')")
 }
 
 const (
@@ -108,6 +117,9 @@ type Info struct {
 	EtcdURL string `json:"etcdURL"`
 	WebURL  string `json:"webURL"`
 
+	RaftListenHost string `json:"raftListenHost"`
+	EtcdListenHost string `json:"etcdListenHost"`
+
 	RaftTLS TLSInfo `json:"raftTLS"`
 	EtcdTLS TLSInfo `json:"etcdTLS"`
 }
@@ -148,6 +160,8 @@ func main() {
 		raft.SetLogLevel(raft.Debug)
 	}
 
+	parseCorsFlag()
+
 	if machines != "" {
 		cluster = strings.Split(machines, ",")
 	} else if machinesFile != "" {
@@ -179,6 +193,9 @@ func main() {
 	argInfo.EtcdURL = sanitizeURL(argInfo.EtcdURL, etcdTLSConfig.Scheme)
 	argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
 
+	argInfo.RaftListenHost = sanitizeListenHost(argInfo.RaftListenHost, argInfo.RaftURL)
+	argInfo.EtcdListenHost = sanitizeListenHost(argInfo.EtcdListenHost, argInfo.EtcdURL)
+
 	// Read server info from file or grab it from user.
 	if err := os.MkdirAll(dirPath, 0744); err != nil {
 		fatalf("Unable to create path: %s", err)
@@ -191,11 +208,29 @@ func main() {
 	snapConf = newSnapshotConf()
 
 	// Create etcd and raft server
-	e = newEtcdServer(info.Name, info.EtcdURL, &etcdTLSConfig, &info.EtcdTLS)
-	r = newRaftServer(info.Name, info.RaftURL, &raftTLSConfig, &info.RaftTLS)
+	e = newEtcdServer(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS)
+	r = newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS)
 
 	startWebInterface()
 	r.ListenAndServe()
 	e.ListenAndServe()
 
 }
+
+// parseCorsFlag gathers up the cors whitelist and puts it into the corsList.
+func parseCorsFlag() {
+	if cors != "" {
+		corsList = make(map[string]bool)
+		list := strings.Split(cors, ",")
+		for _, v := range list {
+			fmt.Println(v)
+			if v != "*" {
+				_, err := url.Parse(v)
+				if err != nil {
+					panic(fmt.Sprintf("bad cors url: %s", err))
+				}
+			}
+			corsList[v] = true
+		}
+	}
+}

+ 25 - 6
etcd_handlers.go

@@ -30,7 +30,26 @@ func NewEtcdMuxer() *http.ServeMux {
 
 type errorHandler func(http.ResponseWriter, *http.Request) error
 
+// addCorsHeader parses the request Origin header and loops through the user
+// provided allowed origins and sets the Access-Control-Allow-Origin header if
+// there is a match.
+func addCorsHeader(w http.ResponseWriter, r *http.Request) {
+	val, ok := corsList["*"]
+	if val && ok {
+		w.Header().Add("Access-Control-Allow-Origin", "*")
+		return
+	}
+
+	requestOrigin := r.Header.Get("Origin")
+	val, ok = corsList[requestOrigin]
+	if val && ok {
+		w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
+		return
+	}
+}
+
 func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	addCorsHeader(w, r)
 	if e := fn(w, r); e != nil {
 		if etcdErr, ok := e.(etcdErr.Error); ok {
 			debug("Return error: ", etcdErr.Error())
@@ -74,15 +93,15 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) error {
 
 	debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
 
-	value := req.FormValue("value")
+	req.ParseForm()
+
+	value := req.Form.Get("value")
 
 	if len(value) == 0 {
 		return etcdErr.NewError(200, "Set")
 	}
 
-	prevValue := req.FormValue("prevValue")
-
-	strDuration := req.FormValue("ttl")
+	strDuration := req.Form.Get("ttl")
 
 	expireTime, err := durationToExpireTime(strDuration)
 
@@ -90,11 +109,11 @@ func SetHttpHandler(w http.ResponseWriter, req *http.Request) error {
 		return etcdErr.NewError(202, "Set")
 	}
 
-	if len(prevValue) != 0 {
+	if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 {
 		command := &TestAndSetCommand{
 			Key:        key,
 			Value:      value,
-			PrevValue:  prevValue,
+			PrevValue:  prevValueArr[0],
 			ExpireTime: expireTime,
 		}
 

+ 2 - 9
etcd_server.go

@@ -2,7 +2,6 @@ package main
 
 import (
 	"net/http"
-	"net/url"
 )
 
 type etcdServer struct {
@@ -15,18 +14,12 @@ type etcdServer struct {
 
 var e *etcdServer
 
-func newEtcdServer(name string, urlStr string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *etcdServer {
-	u, err := url.Parse(urlStr)
-
-	if err != nil {
-		fatalf("invalid url '%s': %s", e.url, err)
-	}
-
+func newEtcdServer(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *etcdServer {
 	return &etcdServer{
 		Server: http.Server{
 			Handler:   NewEtcdMuxer(),
 			TLSConfig: &tlsConf.Server,
-			Addr:      u.Host,
+			Addr:      listenHost,
 		},
 		name:    name,
 		url:     urlStr,

+ 26 - 0
etcd_test.go

@@ -55,6 +55,32 @@ func TestSingleNode(t *testing.T) {
 		}
 		t.Fatalf("Set 2 failed with %s %s %v", result.Key, result.Value, result.TTL)
 	}
+
+	// Add a test-and-set test
+
+	// First, we'll test we can change the value if we get it write
+	result, match, err := c.TestAndSet("foo", "bar", "foobar", 100)
+
+	if err != nil || result.Key != "/foo" || result.Value != "foobar" || result.PrevValue != "bar" || result.TTL != 99 || !match {
+		if err != nil {
+			t.Fatal(err)
+		}
+		t.Fatalf("Set 3 failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+
+	// Next, we'll make sure we can't set it without the correct prior value
+	_, _, err = c.TestAndSet("foo", "bar", "foofoo", 100)
+
+	if err == nil {
+		t.Fatalf("Set 4 expecting error when setting key with incorrect previous value")
+	}
+
+	// Finally, we'll make sure a blank previous value still counts as a test-and-set and still has to match
+	_, _, err = c.TestAndSet("foo", "", "barbar", 100)
+
+	if err == nil {
+		t.Fatalf("Set 5 expecting error when setting key with blank (incorrect) previous value")
+	}
 }
 
 // TestInternalVersionFail will ensure that etcd does not come up if the internal raft

+ 8 - 7
raft_server.go

@@ -21,6 +21,7 @@ type raftServer struct {
 	joinIndex   uint64
 	name        string
 	url         string
+	listenHost  string
 	tlsConf     *TLSConfig
 	tlsInfo     *TLSInfo
 	peersStats  map[string]*raftPeerStats
@@ -29,10 +30,10 @@ type raftServer struct {
 
 var r *raftServer
 
-func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
+func newRaftServer(name string, url string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
 
 	// Create transporter for raft
-	raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client)
+	raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client, ElectionTimeout)
 
 	// Create raft server
 	server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil)
@@ -147,15 +148,14 @@ func startAsFollower() {
 
 // Start to listen and response raft command
 func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
-	u, _ := url.Parse(r.url)
-	infof("raft server [%s:%s]", r.name, u)
+	infof("raft server [%s:%s]", r.name, r.listenHost)
 
 	raftMux := http.NewServeMux()
 
 	server := &http.Server{
 		Handler:   raftMux,
 		TLSConfig: &tlsConf,
-		Addr:      u.Host,
+		Addr:      r.listenHost,
 	}
 
 	// internal commands
@@ -181,7 +181,7 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
 // getVersion fetches the raft version of a peer. This works for now but we
 // will need to do something more sophisticated later when we allow mixed
 // version clusters.
-func getVersion(t transporter, versionURL url.URL) (string, error) {
+func getVersion(t *transporter, versionURL url.URL) (string, error) {
 	resp, err := t.Get(versionURL.String())
 
 	if err != nil {
@@ -210,6 +210,7 @@ func joinCluster(cluster []string) bool {
 			if _, ok := err.(etcdErr.Error); ok {
 				fatal(err)
 			}
+
 			debugf("cannot join to cluster via machine %s %s", machine, err)
 		}
 	}
@@ -221,7 +222,7 @@ func joinByMachine(s *raft.Server, machine string, scheme string) error {
 	var b bytes.Buffer
 
 	// t must be ok
-	t, _ := r.Transporter().(transporter)
+	t, _ := r.Transporter().(*transporter)
 
 	// Our version must match the leaders version
 	versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}

+ 1 - 1
raft_stats.go

@@ -168,7 +168,7 @@ func (q *statsQueue) Rate() (float64, float64) {
 		return 0, 0
 	}
 
-	if time.Now.Sub(back.Time()) > time.Second {
+	if time.Now().Sub(back.Time()) > time.Second {
 		q.Clear()
 		return 0, 0
 	}

+ 54 - 11
transporter.go

@@ -15,13 +15,20 @@ import (
 
 // Transporter layer for communication between raft nodes
 type transporter struct {
-	client *http.Client
+	client  *http.Client
+	timeout time.Duration
+}
+
+// response struct
+type transporterResponse struct {
+	resp *http.Response
+	err  error
 }
 
 // Create transporter using by raft server
 // Create http or https transporter based on
 // whether the user give the server cert and key
-func newTransporter(scheme string, tlsConf tls.Config) transporter {
+func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) *transporter {
 	t := transporter{}
 
 	tr := &http.Transport{
@@ -34,8 +41,9 @@ func newTransporter(scheme string, tlsConf tls.Config) transporter {
 	}
 
 	t.client = &http.Client{Transport: tr}
+	t.timeout = timeout
 
-	return t
+	return &t
 }
 
 // Dial with timeout
@@ -44,7 +52,7 @@ func dialTimeout(network, addr string) (net.Conn, error) {
 }
 
 // Sends AppendEntries RPCs to a peer when the server is the leader.
-func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
+func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
 	var aersp *raft.AppendEntriesResponse
 	var b bytes.Buffer
 
@@ -92,7 +100,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe
 }
 
 // Sends RequestVote RPCs to a peer when the server is the candidate.
-func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
+func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
 	var rvrsp *raft.RequestVoteResponse
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
@@ -118,7 +126,7 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *
 }
 
 // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
-func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
+func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
 	var aersp *raft.SnapshotResponse
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
@@ -146,7 +154,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r
 }
 
 // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
-func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
+func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
 	var aersp *raft.SnapshotRecoveryResponse
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
@@ -173,11 +181,46 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
 }
 
 // Send server side POST request
-func (t transporter) Post(path string, body io.Reader) (*http.Response, error) {
-	return t.client.Post(path, "application/json", body)
+func (t *transporter) Post(path string, body io.Reader) (*http.Response, error) {
+
+	c := make(chan *transporterResponse, 1)
+
+	go func() {
+		tr := new(transporterResponse)
+		tr.resp, tr.err = t.client.Post(path, "application/json", body)
+		c <- tr
+	}()
+
+	return t.waitResponse(c)
+
 }
 
 // Send server side GET request
-func (t transporter) Get(path string) (*http.Response, error) {
-	return t.client.Get(path)
+func (t *transporter) Get(path string) (*http.Response, error) {
+
+	c := make(chan *transporterResponse, 1)
+
+	go func() {
+		tr := new(transporterResponse)
+		tr.resp, tr.err = t.client.Get(path)
+		c <- tr
+	}()
+
+	return t.waitResponse(c)
+}
+
+func (t *transporter) waitResponse(responseChan chan *transporterResponse) (*http.Response, error) {
+
+	timeoutChan := time.After(t.timeout)
+
+	select {
+	case <-timeoutChan:
+		return nil, fmt.Errorf("Wait Response Timeout: %v", t.timeout)
+
+	case r := <-responseChan:
+		return r.resp, r.err
+	}
+
+	// for complier
+	return nil, nil
 }

+ 36 - 0
transporter_test.go

@@ -0,0 +1,36 @@
+package main
+
+import (
+	"crypto/tls"
+	"testing"
+	"time"
+)
+
+func TestTransporterTimeout(t *testing.T) {
+
+	conf := tls.Config{}
+
+	ts := newTransporter("http", conf, time.Second)
+
+	ts.Get("http://google.com")
+	_, err := ts.Get("http://google.com:9999") // it doesn't exisit
+	if err == nil || err.Error() != "Wait Response Timeout: 1s" {
+		t.Fatal("timeout error: ", err.Error())
+	}
+
+	_, err = ts.Post("http://google.com:9999", nil) // it doesn't exisit
+	if err == nil || err.Error() != "Wait Response Timeout: 1s" {
+		t.Fatal("timeout error: ", err.Error())
+	}
+
+	_, err = ts.Get("http://www.google.com")
+	if err != nil {
+		t.Fatal("get error")
+	}
+
+	_, err = ts.Post("http://www.google.com", nil)
+	if err != nil {
+		t.Fatal("post error")
+	}
+
+}

+ 21 - 0
util.go

@@ -107,6 +107,27 @@ func sanitizeURL(host string, defaultScheme string) string {
 	return p.String()
 }
 
+// sanitizeListenHost cleans up the ListenHost parameter and appends a port
+// if necessary based on the advertised port.
+func sanitizeListenHost(listen string, advertised string) string {
+	aurl, err := url.Parse(advertised)
+	if err != nil {
+		fatal(err)
+	}
+
+	ahost, aport, err := net.SplitHostPort(aurl.Host)
+	if err != nil {
+		fatal(err)
+	}
+
+	// If the listen host isn't set use the advertised host
+	if listen == "" {
+		listen = ahost
+	}
+
+	return net.JoinHostPort(listen, aport)
+}
+
 func check(err error) {
 	if err != nil {
 		fatal(err)