Browse Source

Merge pull request #1048 from bcwaldon/proxy-monster

proxy mode
Xiang Li 11 years ago
parent
commit
338ca6050e
6 changed files with 237 additions and 16 deletions
  1. 16 8
      etcdserver/etcdhttp/http.go
  2. 54 0
      etcdserver/etcdhttp/http_test.go
  3. 31 7
      main.go
  4. 64 0
      proxy/proxy.go
  5. 71 0
      proxy/proxy_test.go
  6. 1 1
      test

+ 16 - 8
etcdserver/etcdhttp/http.go

@@ -85,6 +85,20 @@ func (ps Peers) IDs() []int64 {
 	return ids
 	return ids
 }
 }
 
 
+// Endpoints returns a list of all peer addresses. Each address is
+// prefixed with "http://". The returned list is sorted (asc).
+func (ps Peers) Endpoints() []string {
+	endpoints := make([]string, 0)
+	for _, addrs := range ps {
+		for _, addr := range addrs {
+			endpoints = append(endpoints, addScheme(addr))
+		}
+	}
+	sort.Strings(endpoints)
+
+	return endpoints
+}
+
 var errClosed = errors.New("etcdhttp: client closed connection")
 var errClosed = errors.New("etcdhttp: client closed connection")
 
 
 const DefaultTimeout = 500 * time.Millisecond
 const DefaultTimeout = 500 * time.Millisecond
@@ -209,14 +223,8 @@ func (h Handler) serveMachines(w http.ResponseWriter, r *http.Request) {
 		allow(w, "GET", "HEAD")
 		allow(w, "GET", "HEAD")
 		return
 		return
 	}
 	}
-	urls := make([]string, 0)
-	for _, addrs := range h.Peers {
-		for _, addr := range addrs {
-			urls = append(urls, addScheme(addr))
-		}
-	}
-	sort.Strings(urls)
-	w.Write([]byte(strings.Join(urls, ", ")))
+	endpoints := h.Peers.Endpoints()
+	w.Write([]byte(strings.Join(endpoints, ", ")))
 }
 }
 
 
 func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.Request) {
 func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.Request) {

+ 54 - 0
etcdserver/etcdhttp/http_test.go

@@ -397,3 +397,57 @@ func TestServeMachines(t *testing.T) {
 		t.Errorf("header = %d, want %d", writer.Code, http.StatusOK)
 		t.Errorf("header = %d, want %d", writer.Code, http.StatusOK)
 	}
 	}
 }
 }
+
+func TestPeersEndpoints(t *testing.T) {
+	tests := []struct {
+		peers     Peers
+		endpoints []string
+	}{
+		// single peer with a single address
+		{
+			peers: Peers(map[int64][]string{
+				1: []string{"192.0.2.1"},
+			}),
+			endpoints: []string{"http://192.0.2.1"},
+		},
+
+		// single peer with a single address with a port
+		{
+			peers: Peers(map[int64][]string{
+				1: []string{"192.0.2.1:8001"},
+			}),
+			endpoints: []string{"http://192.0.2.1:8001"},
+		},
+
+		// several peers explicitly unsorted
+		{
+			peers: Peers(map[int64][]string{
+				2: []string{"192.0.2.3", "192.0.2.4"},
+				3: []string{"192.0.2.5", "192.0.2.6"},
+				1: []string{"192.0.2.1", "192.0.2.2"},
+			}),
+			endpoints: []string{"http://192.0.2.1", "http://192.0.2.2", "http://192.0.2.3", "http://192.0.2.4", "http://192.0.2.5", "http://192.0.2.6"},
+		},
+
+		// no peers
+		{
+			peers:     Peers(map[int64][]string{}),
+			endpoints: []string{},
+		},
+
+		// peer with no endpoints
+		{
+			peers: Peers(map[int64][]string{
+				3: []string{},
+			}),
+			endpoints: []string{},
+		},
+	}
+
+	for i, tt := range tests {
+		endpoints := tt.peers.Endpoints()
+		if !reflect.DeepEqual(tt.endpoints, endpoints) {
+			t.Errorf("#%d: peers.Endpoints() incorrect: want=%#v got=%#v", i, tt.endpoints, endpoints)
+		}
+	}
+}

+ 31 - 7
main.go

@@ -12,6 +12,7 @@ import (
 
 
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver"
 	"github.com/coreos/etcd/etcdserver/etcdhttp"
 	"github.com/coreos/etcd/etcdserver/etcdhttp"
+	"github.com/coreos/etcd/proxy"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/raft"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/wal"
 	"github.com/coreos/etcd/wal"
@@ -23,10 +24,11 @@ const (
 )
 )
 
 
 var (
 var (
-	fid     = flag.String("id", "0x1", "ID of this server")
-	timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout")
-	laddr   = flag.String("l", ":8080", "HTTP service address (e.g., ':8080')")
-	dir     = flag.String("data-dir", "", "Path to the data directory")
+	fid       = flag.String("id", "0x1", "ID of this server")
+	timeout   = flag.Duration("timeout", 10*time.Second, "Request Timeout")
+	laddr     = flag.String("l", ":8080", "HTTP service address (e.g., ':8080')")
+	dir       = flag.String("data-dir", "", "Path to the data directory")
+	proxyMode = flag.Bool("proxy-mode", false, "Forward HTTP requests to peers, do not participate in raft.")
 
 
 	peers = &etcdhttp.Peers{}
 	peers = &etcdhttp.Peers{}
 )
 )
@@ -39,6 +41,18 @@ func init() {
 func main() {
 func main() {
 	flag.Parse()
 	flag.Parse()
 
 
+	var h http.Handler
+	if *proxyMode {
+		h = startProxy()
+	} else {
+		h = startEtcd()
+	}
+
+	http.Handle("/", h)
+	log.Fatal(http.ListenAndServe(*laddr, nil))
+}
+
+func startEtcd() http.Handler {
 	id, err := strconv.ParseInt(*fid, 0, 64)
 	id, err := strconv.ParseInt(*fid, 0, 64)
 	if err != nil {
 	if err != nil {
 		log.Fatal(err)
 		log.Fatal(err)
@@ -67,13 +81,14 @@ func main() {
 		Ticker: tk.C,
 		Ticker: tk.C,
 	}
 	}
 	etcdserver.Start(s)
 	etcdserver.Start(s)
-	h := &etcdhttp.Handler{
+
+	h := etcdhttp.Handler{
 		Timeout: *timeout,
 		Timeout: *timeout,
 		Server:  s,
 		Server:  s,
 		Peers:   *peers,
 		Peers:   *peers,
 	}
 	}
-	http.Handle("/", h)
-	log.Fatal(http.ListenAndServe(*laddr, nil))
+
+	return &h
 }
 }
 
 
 // startRaft starts a raft node from the given wal dir.
 // startRaft starts a raft node from the given wal dir.
@@ -107,3 +122,12 @@ func startRaft(id int64, peerIDs []int64, waldir string) (raft.Node, *wal.WAL) {
 	n := raft.Restart(id, peerIDs, 10, 1, st, ents)
 	n := raft.Restart(id, peerIDs, 10, 1, st, ents)
 	return n, w
 	return n, w
 }
 }
+
+func startProxy() http.Handler {
+	h, err := proxy.NewHandler((*peers).Endpoints())
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	return h
+}

+ 64 - 0
proxy/proxy.go

@@ -0,0 +1,64 @@
+package proxy
+
+import (
+	"errors"
+	"fmt"
+	"net/http"
+	"net/http/httputil"
+	"net/url"
+)
+
+func NewHandler(endpoints []string) (*httputil.ReverseProxy, error) {
+	d, err := newDirector(endpoints)
+	if err != nil {
+		return nil, err
+	}
+
+	proxy := httputil.ReverseProxy{
+		Director:      d.direct,
+		Transport:     &http.Transport{},
+		FlushInterval: 0,
+	}
+
+	return &proxy, nil
+}
+
+func newDirector(endpoints []string) (*director, error) {
+	if len(endpoints) == 0 {
+		return nil, errors.New("one or more endpoints required")
+	}
+
+	urls := make([]url.URL, len(endpoints))
+	for i, e := range endpoints {
+		u, err := url.Parse(e)
+		if err != nil {
+			return nil, fmt.Errorf("invalid endpoint %q: %v", e, err)
+		}
+
+		if u.Scheme == "" {
+			return nil, fmt.Errorf("invalid endpoint %q: scheme required", e)
+		}
+
+		if u.Host == "" {
+			return nil, fmt.Errorf("invalid endpoint %q: host empty", e)
+		}
+
+		urls[i] = *u
+	}
+
+	d := director{
+		endpoints: urls,
+	}
+
+	return &d, nil
+}
+
+type director struct {
+	endpoints []url.URL
+}
+
+func (d *director) direct(req *http.Request) {
+	choice := d.endpoints[0]
+	req.URL.Scheme = choice.Scheme
+	req.URL.Host = choice.Host
+}

+ 71 - 0
proxy/proxy_test.go

@@ -0,0 +1,71 @@
+package proxy
+
+import (
+	"net/http"
+	"net/url"
+	"reflect"
+	"testing"
+)
+
+func TestNewDirector(t *testing.T) {
+	tests := []struct {
+		good      bool
+		endpoints []string
+	}{
+		{true, []string{"http://192.0.2.8"}},
+		{true, []string{"http://192.0.2.8:8001"}},
+		{true, []string{"http://example.com"}},
+		{true, []string{"http://example.com:8001"}},
+		{true, []string{"http://192.0.2.8:8001", "http://example.com:8002"}},
+
+		{false, []string{"192.0.2.8"}},
+		{false, []string{"192.0.2.8:8001"}},
+		{false, []string{""}},
+	}
+
+	for i, tt := range tests {
+		_, err := newDirector(tt.endpoints)
+		if tt.good != (err == nil) {
+			t.Errorf("#%d: expected success = %t, got err = %v", i, tt.good, err)
+		}
+	}
+}
+
+func TestDirectorDirect(t *testing.T) {
+	d := &director{
+		endpoints: []url.URL{
+			url.URL{
+				Scheme: "http",
+				Host:   "bar.example.com",
+			},
+		},
+	}
+
+	req := &http.Request{
+		Method: "GET",
+		Host:   "foo.example.com",
+		URL: &url.URL{
+			Host: "foo.example.com",
+			Path: "/v2/keys/baz",
+		},
+	}
+
+	d.direct(req)
+
+	want := &http.Request{
+		Method: "GET",
+		// this field must not change
+		Host: "foo.example.com",
+		URL: &url.URL{
+			// the Scheme field is updated per the director's first endpoint
+			Scheme: "http",
+			// the Host field is updated per the director's first endpoint
+			Host: "bar.example.com",
+			Path: "/v2/keys/baz",
+		},
+	}
+
+	if !reflect.DeepEqual(want, req) {
+		t.Fatalf("HTTP request does not match expected criteria: want=%#v got=%#v", want, req)
+	}
+}

+ 1 - 1
test

@@ -14,7 +14,7 @@ COVER=${COVER:-"-cover"}
 
 
 source ./build
 source ./build
 
 
-TESTABLE="wal snap etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb functional raft store"
+TESTABLE="wal snap etcdserver etcdserver/etcdhttp etcdserver/etcdserverpb functional proxy raft store"
 FORMATTABLE="$TESTABLE cors.go main.go"
 FORMATTABLE="$TESTABLE cors.go main.go"
 
 
 # user has not provided PKG override
 # user has not provided PKG override