Browse Source

discovery: init commit

Xiang Li 11 years ago
parent
commit
2b623cf0fa
5 changed files with 478 additions and 5 deletions
  1. 3 3
      client/client.go
  2. 2 2
      client/http.go
  3. 136 0
      discovery/discovery.go
  4. 317 0
      discovery/discovery_test.go
  5. 20 0
      discovery/doc.go

+ 3 - 3
client/client.go

@@ -16,8 +16,8 @@ var (
 type Client interface {
 	Create(key, value string, ttl time.Duration) (*Response, error)
 	Get(key string) (*Response, error)
-	Watch(key string) Watcher
-	RecursiveWatch(key string) Watcher
+	Watch(key string, idx uint64) Watcher
+	RecursiveWatch(key string, idx uint64) Watcher
 }
 
 type Watcher interface {
@@ -30,7 +30,7 @@ type Response struct {
 	PrevNode *Node  `json:"prevNode"`
 }
 
-type Nodes []Node
+type Nodes []*Node
 type Node struct {
 	Key           string `json:"key"`
 	Value         string `json:"value"`

+ 2 - 2
client/http.go

@@ -127,7 +127,7 @@ func (c *httpClient) do(ctx context.Context, act httpAction) (*http.Response, []
 	return resp, body, err
 }
 
-func (c *httpClient) Watch(key string, idx uint64) *httpWatcher {
+func (c *httpClient) Watch(key string, idx uint64) Watcher {
 	return &httpWatcher{
 		httpClient: *c,
 		nextWait: waitAction{
@@ -138,7 +138,7 @@ func (c *httpClient) Watch(key string, idx uint64) *httpWatcher {
 	}
 }
 
-func (c *httpClient) RecursiveWatch(key string, idx uint64) *httpWatcher {
+func (c *httpClient) RecursiveWatch(key string, idx uint64) Watcher {
 	return &httpWatcher{
 		httpClient: *c,
 		nextWait: waitAction{

+ 136 - 0
discovery/discovery.go

@@ -0,0 +1,136 @@
+package discovery
+
+import (
+	"errors"
+	"fmt"
+	"path"
+	"sort"
+	"strconv"
+	"strings"
+
+	"github.com/coreos/etcd/client"
+	"github.com/coreos/etcd/etcdserver/etcdhttp"
+)
+
+var (
+	ErrInvalidURL    = errors.New("discovery: invalid URL")
+	ErrBadCluster    = errors.New("discovery: bad key/value inside cluster")
+	ErrSizeNotFound  = errors.New("discovery: size key not found")
+	ErrTokenNotFound = errors.New("discovery: token not found")
+	ErrDuplicateID   = errors.New("discovery: found duplicate id")
+	ErrFullCluster   = errors.New("discovery: cluster is full")
+)
+
+type discovery struct {
+	cluster string
+	id      int64
+	ctx     []byte
+	c       client.Client
+}
+
+func (d *discovery) discover() (*etcdhttp.Peers, error) {
+	if err := d.createSelf(); err != nil {
+		return nil, err
+	}
+
+	nodes, size, err := d.checkCluster()
+	if err != nil {
+		return nil, err
+	}
+
+	all, err := d.waitNodes(nodes, size)
+	if err != nil {
+		return nil, err
+	}
+
+	return nodesToPeers(all)
+}
+
+func (d *discovery) createSelf() error {
+	self := path.Join("/", d.cluster, fmt.Sprintf("%d", d.id))
+	// create self key
+	resp, err := d.c.Create(self, string(d.ctx), 0)
+	if err != nil {
+		return err
+	}
+
+	// ensure self appears on the server we connected to
+	w := d.c.Watch(self, resp.Node.CreatedIndex)
+	if _, err = w.Next(); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (d *discovery) checkCluster() (client.Nodes, int, error) {
+	self := path.Join("/", d.cluster, fmt.Sprintf("%d", d.id))
+	resp, err := d.c.Get(d.cluster)
+	if err != nil {
+		return nil, 0, err
+	}
+	nodes := resp.Node.Nodes
+	snodes := SortableNodes{nodes}
+	sort.Sort(snodes)
+
+	// find cluster size
+	if nodes[0].Key != path.Join("/", d.cluster, "size") {
+		return nil, 0, ErrSizeNotFound
+	}
+	size, err := strconv.Atoi(nodes[0].Value)
+	if err != nil {
+		return nil, 0, ErrBadCluster
+	}
+
+	// remove size key from nodes
+	nodes = nodes[1:]
+
+	// find self position
+	for i := range nodes {
+		if nodes[i].Key == self {
+			break
+		}
+		if i >= size-1 {
+			return nil, size, ErrFullCluster
+		}
+	}
+	return nodes, size, nil
+}
+
+func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error) {
+	if len(nodes) > size {
+		nodes = nodes[:size]
+	}
+	w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex)
+	all := make(client.Nodes, len(nodes))
+	copy(all, nodes)
+	// wait for others
+	for len(all) < size {
+		resp, err := w.Next()
+		if err != nil {
+			return nil, err
+		}
+		all = append(all, resp.Node)
+	}
+	return all, nil
+}
+
+func nodesToPeers(ns client.Nodes) (*etcdhttp.Peers, error) {
+	s := make([]string, len(ns))
+	for i, n := range ns {
+		s[i] = n.Value
+	}
+
+	var peers etcdhttp.Peers
+	if err := peers.Set(strings.Join(s, "&")); err != nil {
+		return nil, err
+	}
+	return &peers, nil
+}
+
+type SortableNodes struct{ client.Nodes }
+
+func (ns SortableNodes) Len() int { return len(ns.Nodes) }
+func (ns SortableNodes) Less(i, j int) bool {
+	return ns.Nodes[i].CreatedIndex < ns.Nodes[j].CreatedIndex
+}
+func (ns SortableNodes) Swap(i, j int) { ns.Nodes[i], ns.Nodes[j] = ns.Nodes[j], ns.Nodes[i] }

+ 317 - 0
discovery/discovery_test.go

@@ -0,0 +1,317 @@
+package discovery
+
+import (
+	"errors"
+
+	"reflect"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/client"
+	"github.com/coreos/etcd/etcdserver/etcdhttp"
+)
+
+func TestCheckCluster(t *testing.T) {
+	cluster := "1000"
+	self := "/1000/1"
+
+	tests := []struct {
+		nodes []*client.Node
+		werr  error
+		wsize int
+	}{
+		{
+			// self is in the size range
+			client.Nodes{
+				{Key: "/1000/size", Value: "3", CreatedIndex: 1},
+				{Key: self, CreatedIndex: 2},
+				{Key: "/1000/2", CreatedIndex: 3},
+				{Key: "/1000/3", CreatedIndex: 4},
+				{Key: "/1000/4", CreatedIndex: 5},
+			},
+			nil,
+			3,
+		},
+		{
+			// self is in the size range
+			client.Nodes{
+				{Key: "/1000/size", Value: "3", CreatedIndex: 1},
+				{Key: "/1000/2", CreatedIndex: 2},
+				{Key: "/1000/3", CreatedIndex: 3},
+				{Key: self, CreatedIndex: 4},
+				{Key: "/1000/4", CreatedIndex: 5},
+			},
+			nil,
+			3,
+		},
+		{
+			// self is out of the size range
+			client.Nodes{
+				{Key: "/1000/size", Value: "3", CreatedIndex: 1},
+				{Key: "/1000/2", CreatedIndex: 2},
+				{Key: "/1000/3", CreatedIndex: 3},
+				{Key: "/1000/4", CreatedIndex: 4},
+				{Key: self, CreatedIndex: 5},
+			},
+			ErrFullCluster,
+			3,
+		},
+		{
+			// self is not in the cluster
+			client.Nodes{
+				{Key: "/1000/size", Value: "3", CreatedIndex: 1},
+				{Key: "/1000/2", CreatedIndex: 2},
+				{Key: "/1000/3", CreatedIndex: 3},
+			},
+			nil,
+			3,
+		},
+		{
+			client.Nodes{
+				{Key: "/1000/size", Value: "3", CreatedIndex: 1},
+				{Key: "/1000/2", CreatedIndex: 2},
+				{Key: "/1000/3", CreatedIndex: 3},
+				{Key: "/1000/4", CreatedIndex: 4},
+			},
+			ErrFullCluster,
+			3,
+		},
+		{
+			// bad size key
+			client.Nodes{
+				{Key: "/1000/size", Value: "bad", CreatedIndex: 1},
+			},
+			ErrBadCluster,
+			0,
+		},
+		{
+			// no size key
+			client.Nodes{
+				{Key: self, CreatedIndex: 1},
+			},
+			ErrSizeNotFound,
+			0,
+		},
+	}
+
+	for i, tt := range tests {
+		resp := &client.Response{
+			Node: &client.Node{
+				Key:   cluster,
+				Nodes: tt.nodes,
+			},
+		}
+
+		c := &clientWithResp{
+			rs: []*client.Response{resp},
+		}
+
+		d := discovery{cluster: cluster, id: 1, c: c}
+		ns, size, err := d.checkCluster()
+		if err != tt.werr {
+			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
+		}
+		if reflect.DeepEqual(ns, tt.nodes) {
+			t.Errorf("#%d: nodes = %v, want %v", i, ns, tt.nodes)
+		}
+		if size != tt.wsize {
+			t.Errorf("#%d: size = %v, want %d", i, size, tt.wsize)
+		}
+	}
+}
+
+func TestWaitNodes(t *testing.T) {
+	all := client.Nodes{
+		{Key: "/1000/1", CreatedIndex: 2},
+		{Key: "/1000/2", CreatedIndex: 3},
+		{Key: "/1000/3", CreatedIndex: 4},
+	}
+
+	tests := []struct {
+		nodes client.Nodes
+		size  int
+		rs    []*client.Response
+
+		werr error
+		wall client.Nodes
+	}{
+		{
+			all,
+			3,
+			[]*client.Response{},
+			nil,
+			all,
+		},
+		{
+			all[:1],
+			3,
+			[]*client.Response{
+				{Node: &client.Node{Key: "/1000/2", CreatedIndex: 3}},
+				{Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
+			},
+			nil,
+			all,
+		},
+		{
+			all[:2],
+			3,
+			[]*client.Response{
+				{Node: &client.Node{Key: "/1000/3", CreatedIndex: 4}},
+			},
+			nil,
+			all,
+		},
+	}
+
+	for i, tt := range tests {
+		c := &clientWithResp{nil, &watcherWithResp{tt.rs}}
+		d := &discovery{cluster: "1000", c: c}
+		g, err := d.waitNodes(tt.nodes, tt.size)
+		if err != tt.werr {
+			t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
+		}
+		if !reflect.DeepEqual(g, tt.wall) {
+			t.Errorf("#%d: all = %v, want %v", i, g, tt.wall)
+		}
+	}
+}
+
+func TestCreateSelf(t *testing.T) {
+	rs := []*client.Response{{Node: &client.Node{Key: "1000/1", CreatedIndex: 2}}}
+
+	w := &watcherWithResp{rs}
+	errw := &watcherWithErr{errors.New("watch err")}
+
+	c := &clientWithResp{rs, w}
+	errc := &clientWithErr{errors.New("create err"), w}
+	errwc := &clientWithResp{rs, errw}
+
+	tests := []struct {
+		c    client.Client
+		werr error
+	}{
+		// no error
+		{c, nil},
+		// client.create returns an error
+		{errc, errc.err},
+		// watcher.next retuens an error
+		{errwc, errw.err},
+	}
+
+	for i, tt := range tests {
+		d := discovery{cluster: "1000", c: tt.c}
+		if err := d.createSelf(); err != tt.werr {
+			t.Errorf("#%d: err = %v, want %v", i, err, nil)
+		}
+	}
+}
+
+func TestNodesToPeers(t *testing.T) {
+	nodes := client.Nodes{
+		{Key: "/1000/1", Value: "1=1.1.1.1", CreatedIndex: 1},
+		{Key: "/1000/2", Value: "2=2.2.2.2", CreatedIndex: 2},
+		{Key: "/1000/3", Value: "3=3.3.3.3", CreatedIndex: 3},
+	}
+	w := &etcdhttp.Peers{}
+	w.Set("1=1.1.1.1&2=2.2.2.2&3=3.3.3.3")
+
+	badnodes := client.Nodes{{Key: "1000/1", Value: "1=1.1.1.1&???", CreatedIndex: 1}}
+
+	tests := []struct {
+		ns client.Nodes
+		wp *etcdhttp.Peers
+		we bool
+	}{
+		{nodes, w, false},
+		{badnodes, nil, true},
+	}
+
+	for i, tt := range tests {
+		peers, err := nodesToPeers(tt.ns)
+		if tt.we {
+			if err == nil {
+				t.Fatalf("#%d: err = %v, want not nil", i, err)
+			}
+		} else {
+			if err != nil {
+				t.Fatalf("#%d: err = %v, want nil", i, err)
+			}
+		}
+		if !reflect.DeepEqual(peers, tt.wp) {
+			t.Errorf("#%d: peers = %v, want %v", i, peers, tt.wp)
+		}
+	}
+}
+
+type clientWithResp struct {
+	rs []*client.Response
+	w  client.Watcher
+}
+
+func (c *clientWithResp) Create(key string, value string, ttl time.Duration) (*client.Response, error) {
+	if len(c.rs) == 0 {
+		return &client.Response{}, nil
+	}
+	r := c.rs[0]
+	c.rs = c.rs[1:]
+	return r, nil
+}
+
+func (c *clientWithResp) Get(key string) (*client.Response, error) {
+	if len(c.rs) == 0 {
+		return &client.Response{}, nil
+	}
+	r := c.rs[0]
+	c.rs = c.rs[1:]
+	return r, nil
+}
+
+func (c *clientWithResp) Watch(key string, waitIndex uint64) client.Watcher {
+	return c.w
+}
+
+func (c *clientWithResp) RecursiveWatch(key string, waitIndex uint64) client.Watcher {
+	return c.w
+}
+
+type clientWithErr struct {
+	err error
+	w   client.Watcher
+}
+
+func (c *clientWithErr) Create(key string, value string, ttl time.Duration) (*client.Response, error) {
+	return &client.Response{}, c.err
+}
+
+func (c *clientWithErr) Get(key string) (*client.Response, error) {
+	return &client.Response{}, c.err
+}
+
+func (c *clientWithErr) Watch(key string, waitIndex uint64) client.Watcher {
+	return c.w
+}
+
+func (c *clientWithErr) RecursiveWatch(key string, waitIndex uint64) client.Watcher {
+	return c.w
+}
+
+type watcherWithResp struct {
+	rs []*client.Response
+}
+
+func (w *watcherWithResp) Next() (*client.Response, error) {
+	if len(w.rs) == 0 {
+		return &client.Response{}, nil
+	}
+	r := w.rs[0]
+	w.rs = w.rs[1:]
+	return r, nil
+}
+
+type watcherWithErr struct {
+	err error
+}
+
+func (w *watcherWithErr) Next() (*client.Response, error) {
+	return &client.Response{}, w.err
+}

+ 20 - 0
discovery/doc.go

@@ -0,0 +1,20 @@
+// Copyright 2014 CoreOS Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//  http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/*
+Package discovery provides an implementation of the cluster discovery that
+is used by etcd.
+
+*/
+package discovery