Browse Source

Merge pull request #1060 from jonboulle/peers

etcdserver/etcdhttp: split out peers and add tests
Xiang Li 11 years ago
parent
commit
50c1a34f78
3 changed files with 289 additions and 134 deletions
  1. 2 124
      etcdserver/etcdhttp/http.go
  2. 139 0
      etcdserver/etcdhttp/peers.go
  3. 148 10
      etcdserver/etcdhttp/peers_test.go

+ 2 - 124
etcdserver/etcdhttp/http.go

@@ -1,7 +1,6 @@
 package etcdhttp
 package etcdhttp
 
 
 import (
 import (
-	"bytes"
 	"encoding/binary"
 	"encoding/binary"
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
@@ -11,13 +10,11 @@ import (
 	"log"
 	"log"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
-	"sort"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
 	"time"
 	"time"
 
 
 	crand "crypto/rand"
 	crand "crypto/rand"
-	"math/rand"
 
 
 	"github.com/coreos/etcd/elog"
 	"github.com/coreos/etcd/elog"
 	etcdErr "github.com/coreos/etcd/error"
 	etcdErr "github.com/coreos/etcd/error"
@@ -31,131 +28,12 @@ import (
 const (
 const (
 	keysPrefix     = "/v2/keys"
 	keysPrefix     = "/v2/keys"
 	machinesPrefix = "/v2/machines"
 	machinesPrefix = "/v2/machines"
-)
-
-type Peers map[int64][]string
-
-func (ps Peers) Pick(id int64) string {
-	addrs := ps[id]
-	if len(addrs) == 0 {
-		return ""
-	}
-	return addScheme(addrs[rand.Intn(len(addrs))])
-}
-
-// TODO: improve this when implementing TLS
-func addScheme(addr string) string {
-	return fmt.Sprintf("http://%s", addr)
-}
-
-// Set parses command line sets of names to ips formatted like:
-// a=1.1.1.1&a=1.1.1.2&b=2.2.2.2
-func (ps *Peers) Set(s string) error {
-	m := make(map[int64][]string)
-	v, err := url.ParseQuery(s)
-	if err != nil {
-		return err
-	}
-	for k, v := range v {
-		id, err := strconv.ParseInt(k, 0, 64)
-		if err != nil {
-			return err
-		}
-		m[id] = v
-	}
-	*ps = m
-	return nil
-}
-
-func (ps *Peers) String() string {
-	v := url.Values{}
-	for k, vv := range *ps {
-		for i := range vv {
-			v.Add(strconv.FormatInt(k, 16), vv[i])
-		}
-	}
-	return v.Encode()
-}
-
-func (ps Peers) IDs() []int64 {
-	var ids []int64
-	for id := range ps {
-		ids = append(ids, id)
-	}
-	return ids
-}
-
-// Endpoints returns a list of all peer addresses. Each address is
-// prefixed with "http://". The returned list is sorted (asc).
-func (ps Peers) Endpoints() []string {
-	endpoints := make([]string, 0)
-	for _, addrs := range ps {
-		for _, addr := range addrs {
-			endpoints = append(endpoints, addScheme(addr))
-		}
-	}
-	sort.Strings(endpoints)
 
 
-	return endpoints
-}
+	DefaultTimeout = 500 * time.Millisecond
+)
 
 
 var errClosed = errors.New("etcdhttp: client closed connection")
 var errClosed = errors.New("etcdhttp: client closed connection")
 
 
-const DefaultTimeout = 500 * time.Millisecond
-
-func Sender(p Peers) func(msgs []raftpb.Message) {
-	return func(msgs []raftpb.Message) {
-		for _, m := range msgs {
-			// TODO: reuse go routines
-			// limit the number of outgoing connections for the same receiver
-			go send(p, m)
-		}
-	}
-}
-
-func send(p Peers, m raftpb.Message) {
-	// TODO (xiangli): reasonable retry logic
-	for i := 0; i < 3; i++ {
-		url := p.Pick(m.To)
-		if url == "" {
-			// TODO: unknown peer id.. what do we do? I
-			// don't think his should ever happen, need to
-			// look into this further.
-			log.Println("etcdhttp: no addr for %d", m.To)
-			return
-		}
-
-		url += "/raft"
-
-		// TODO: don't block. we should be able to have 1000s
-		// of messages out at a time.
-		data, err := m.Marshal()
-		if err != nil {
-			log.Println("etcdhttp: dropping message:", err)
-			return // drop bad message
-		}
-		if httpPost(url, data) {
-			return // success
-		}
-		// TODO: backoff
-	}
-}
-
-func httpPost(url string, data []byte) bool {
-	// TODO: set timeouts
-	resp, err := http.Post(url, "application/protobuf", bytes.NewBuffer(data))
-	if err != nil {
-		elog.TODO()
-		return false
-	}
-	resp.Body.Close()
-	if resp.StatusCode != 200 {
-		elog.TODO()
-		return false
-	}
-	return true
-}
-
 // Handler implements the http.Handler interface and serves etcd client and
 // Handler implements the http.Handler interface and serves etcd client and
 // raft communication.
 // raft communication.
 type Handler struct {
 type Handler struct {

+ 139 - 0
etcdserver/etcdhttp/peers.go

@@ -0,0 +1,139 @@
+package etcdhttp
+
+import (
+	"bytes"
+	"fmt"
+	"log"
+	"math/rand"
+	"net/http"
+	"net/url"
+	"sort"
+	"strconv"
+
+	"github.com/coreos/etcd/elog"
+	"github.com/coreos/etcd/raft/raftpb"
+)
+
+// Peers contains a mapping of unique IDs to a list of hostnames/IP addresses
+type Peers map[int64][]string
+
+// addScheme adds the protocol prefix to a string; currently only HTTP
+// TODO: improve this when implementing TLS
+func addScheme(addr string) string {
+	return fmt.Sprintf("http://%s", addr)
+}
+
+// Pick chooses a random address from a given Peer's addresses, and returns it as
+// an addressible URI. If the given peer does not exist, an empty string is returned.
+func (ps Peers) Pick(id int64) string {
+	addrs := ps[id]
+	if len(addrs) == 0 {
+		return ""
+	}
+	return addScheme(addrs[rand.Intn(len(addrs))])
+}
+
+// Set parses command line sets of names to IPs formatted like:
+// a=1.1.1.1&a=1.1.1.2&b=2.2.2.2
+func (ps *Peers) Set(s string) error {
+	m := make(map[int64][]string)
+	v, err := url.ParseQuery(s)
+	if err != nil {
+		return err
+	}
+	for k, v := range v {
+		id, err := strconv.ParseInt(k, 0, 64)
+		if err != nil {
+			return err
+		}
+		m[id] = v
+	}
+	*ps = m
+	return nil
+}
+
+func (ps *Peers) String() string {
+	v := url.Values{}
+	for k, vv := range *ps {
+		for i := range vv {
+			v.Add(strconv.FormatInt(k, 16), vv[i])
+		}
+	}
+	return v.Encode()
+}
+
+func (ps Peers) IDs() []int64 {
+	var ids []int64
+	for id := range ps {
+		ids = append(ids, id)
+	}
+	return ids
+}
+
+// Endpoints returns a list of all peer addresses. Each address is prefixed
+// with the scheme (currently "http://"). The returned list is sorted in
+// ascending lexicographical order.
+func (ps Peers) Endpoints() []string {
+	endpoints := make([]string, 0)
+	for _, addrs := range ps {
+		for _, addr := range addrs {
+			endpoints = append(endpoints, addScheme(addr))
+		}
+	}
+	sort.Strings(endpoints)
+
+	return endpoints
+}
+
+func Sender(p Peers) func(msgs []raftpb.Message) {
+	return func(msgs []raftpb.Message) {
+		for _, m := range msgs {
+			// TODO: reuse go routines
+			// limit the number of outgoing connections for the same receiver
+			go send(p, m)
+		}
+	}
+}
+
+func send(p Peers, m raftpb.Message) {
+	// TODO (xiangli): reasonable retry logic
+	for i := 0; i < 3; i++ {
+		url := p.Pick(m.To)
+		if url == "" {
+			// TODO: unknown peer id.. what do we do? I
+			// don't think his should ever happen, need to
+			// look into this further.
+			log.Println("etcdhttp: no addr for %d", m.To)
+			return
+		}
+
+		url += "/raft"
+
+		// TODO: don't block. we should be able to have 1000s
+		// of messages out at a time.
+		data, err := m.Marshal()
+		if err != nil {
+			log.Println("etcdhttp: dropping message:", err)
+			return // drop bad message
+		}
+		if httpPost(url, data) {
+			return // success
+		}
+		// TODO: backoff
+	}
+}
+
+func httpPost(url string, data []byte) bool {
+	// TODO: set timeouts
+	resp, err := http.Post(url, "application/protobuf", bytes.NewBuffer(data))
+	if err != nil {
+		elog.TODO()
+		return false
+	}
+	resp.Body.Close()
+	if resp.StatusCode != 200 {
+		elog.TODO()
+		return false
+	}
+	return true
+}

+ 148 - 10
etcdserver/etcdhttp/peers_test.go

@@ -1,19 +1,157 @@
 package etcdhttp
 package etcdhttp
 
 
-import "testing"
+import (
+	"net/http"
+	"net/http/httptest"
+	"reflect"
+	"sort"
+	"testing"
+)
 
 
-//TODO: full testing for peer set
-func TestPeerSet(t *testing.T) {
-	p := &Peers{}
+func TestPeers(t *testing.T) {
+	tests := []struct {
+		in      string
+		wids    []int64
+		wep     []string
+		wstring string
+	}{
+		{
+			"1=1.1.1.1",
+			[]int64{1},
+			[]string{"http://1.1.1.1"},
+			"1=1.1.1.1",
+		},
+		{
+			"2=2.2.2.2",
+			[]int64{2},
+			[]string{"http://2.2.2.2"},
+			"2=2.2.2.2",
+		},
+		{
+			"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
+			[]int64{1, 2},
+			[]string{"http://1.1.1.1", "http://1.1.1.2", "http://2.2.2.2"},
+			"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
+		},
+		{
+			"3=3.3.3.3&4=4.4.4.4&1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
+			[]int64{1, 2, 3, 4},
+			[]string{"http://1.1.1.1", "http://1.1.1.2", "http://2.2.2.2",
+				"http://3.3.3.3", "http://4.4.4.4"},
+			"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2&3=3.3.3.3&4=4.4.4.4",
+		},
+	}
+	for i, tt := range tests {
+		p := &Peers{}
+		err := p.Set(tt.in)
+		if err != nil {
+			t.Errorf("#%d: err=%v, want nil", i, err)
+		}
+		ids := int64Slice(p.IDs())
+		sort.Sort(ids)
+		if !reflect.DeepEqual([]int64(ids), tt.wids) {
+			t.Errorf("#%d: IDs=%#v, want %#v", i, []int64(ids), tt.wids)
+		}
+		ep := p.Endpoints()
+		if !reflect.DeepEqual(ep, tt.wep) {
+			t.Errorf("#%d: Endpoints=%#v, want %#v", i, ep, tt.wep)
+		}
+		s := p.String()
+		if s != tt.wstring {
+			t.Errorf("#%d: string=%q, want %q", i, s, tt.wstring)
+		}
+	}
+}
+
+type int64Slice []int64
+
+func (p int64Slice) Len() int           { return len(p) }
+func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
+func (p int64Slice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
+
+func TestPeersSetBad(t *testing.T) {
 	tests := []string{
 	tests := []string{
-		"1=1.1.1.1",
-		"2=2.2.2.2",
-		"1=1.1.1.1&1=1.1.1.2&2=2.2.2.2",
+		// garbage URL
+		"asdf%%",
+		// non-int64 keys
+		"a=1.2.3.4",
+		"-1-23=1.2.3.4",
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
-		p.Set(tt)
-		if p.String() != tt {
-			t.Errorf("#%d: string = %s, want %s", i, p.String(), tt)
+		p := &Peers{}
+		if err := p.Set(tt); err == nil {
+			t.Errorf("#%d: err=nil unexpectedly", i)
 		}
 		}
 	}
 	}
 }
 }
+
+func TestPeersPick(t *testing.T) {
+	ps := &Peers{
+		1: []string{"abc", "def", "ghi", "jkl", "mno", "pqr", "stu"},
+		2: []string{"xyz"},
+		3: []string{},
+	}
+	ids := map[string]bool{
+		"http://abc": true,
+		"http://def": true,
+		"http://ghi": true,
+		"http://jkl": true,
+		"http://mno": true,
+		"http://pqr": true,
+		"http://stu": true,
+	}
+	for i := 0; i < 1000; i++ {
+		a := ps.Pick(1)
+		if _, ok := ids[a]; !ok {
+			t.Errorf("returned ID %q not in expected range!", a)
+			break
+		}
+	}
+	if b := ps.Pick(2); b != "http://xyz" {
+		t.Errorf("id=%q, want %q", b, "http://xyz")
+	}
+	if c := ps.Pick(3); c != "" {
+		t.Errorf("id=%q, want \"\"", c)
+	}
+}
+
+func TestHttpPost(t *testing.T) {
+	var tr *http.Request
+	tests := []struct {
+		h http.HandlerFunc
+		w bool
+	}{
+		{
+			http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+				tr = r
+				w.WriteHeader(200)
+			}),
+			true,
+		},
+		{
+			http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+				tr = r
+				w.WriteHeader(404)
+			}),
+			false,
+		},
+	}
+	for i, tt := range tests {
+		ts := httptest.NewServer(tt.h)
+		if g := httpPost(ts.URL, []byte("adsf")); g != tt.w {
+			t.Errorf("#%d: httpPost()=%t, want %t", i, g, tt.w)
+		}
+		if tr.Method != "POST" {
+			t.Errorf("#%d: Method=%q, want %q", i, tr.Method, "POST")
+		}
+		if ct := tr.Header.Get("Content-Type"); ct != "application/protobuf" {
+			t.Errorf("%#d: Content-Type=%q, want %q", ct, "application/protobuf")
+		}
+		tr = nil
+		ts.Close()
+	}
+
+	if httpPost("garbage url", []byte("data")) {
+		t.Errorf("httpPost with bad URL returned true unexpectedly!")
+	}
+}