Browse Source

*:discovery hook up

Xiang Li 11 years ago
parent
commit
9e3d045b2b
5 changed files with 112 additions and 63 deletions
  1. 9 3
      client/http.go
  2. 42 19
      discovery/discovery.go
  3. 5 29
      discovery/discovery_test.go
  4. 21 6
      etcdserver/server.go
  5. 35 6
      main.go

+ 9 - 3
client/http.go

@@ -14,7 +14,7 @@ import (
 	"github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
 )
 
-const (
+var (
 	v2Prefix = "/v2/keys"
 )
 
@@ -47,12 +47,18 @@ func NewHTTPClient(tr *http.Transport, ep string, timeout time.Duration) (*httpC
 	return c, nil
 }
 
+func (c *httpClient) SetPrefix(p string) {
+	v2Prefix = p
+}
+
 func (c *httpClient) Create(key, val string, ttl time.Duration) (*Response, error) {
-	uintTTL := uint64(ttl.Seconds())
 	create := &createAction{
 		Key:   key,
 		Value: val,
-		TTL:   &uintTTL,
+	}
+	if ttl >= 0 {
+		uttl := uint64(ttl.Seconds())
+		create.TTL = &uttl
 	}
 
 	ctx, cancel := context.WithTimeout(context.Background(), c.timeout)

+ 42 - 19
discovery/discovery.go

@@ -3,13 +3,15 @@ package discovery
 import (
 	"errors"
 	"fmt"
+	"net/http"
+	"net/url"
 	"path"
 	"sort"
 	"strconv"
 	"strings"
+	"time"
 
 	"github.com/coreos/etcd/client"
-	"github.com/coreos/etcd/etcdserver/etcdhttp"
 )
 
 var (
@@ -21,40 +23,66 @@ var (
 	ErrFullCluster   = errors.New("discovery: cluster is full")
 )
 
+type Discoverer interface {
+	Discover() (string, error)
+}
+
 type discovery struct {
 	cluster string
 	id      int64
-	ctx     []byte
+	config  string
 	c       client.Client
 }
 
-func (d *discovery) discover() (*etcdhttp.Peers, error) {
+func New(durl string, id int64, config string) (Discoverer, error) {
+	u, err := url.Parse(durl)
+	if err != nil {
+		return nil, err
+	}
+	token := u.Path
+	u.Path = ""
+	client, err := client.NewHTTPClient(&http.Transport{}, u.String(), time.Second*5)
+	if err != nil {
+		return nil, err
+	}
+	// discovery service redirects /[key] to /v2/keys/[key]
+	// set the prefix of client to "" to handle this
+	client.SetPrefix("")
+	return &discovery{
+		cluster: token,
+		id:      id,
+		config:  config,
+		c:       client,
+	}, nil
+}
+
+func (d *discovery) Discover() (string, error) {
 	// fast path: if the cluster is full, returns the error
 	// do not need to register itself to the cluster in this
 	// case.
 	if _, _, err := d.checkCluster(); err != nil {
-		return nil, err
+		return "", err
 	}
 
 	if err := d.createSelf(); err != nil {
-		return nil, err
+		return "", err
 	}
 
 	nodes, size, err := d.checkCluster()
 	if err != nil {
-		return nil, err
+		return "", err
 	}
 
 	all, err := d.waitNodes(nodes, size)
 	if err != nil {
-		return nil, err
+		return "", err
 	}
 
-	return nodesToPeers(all)
+	return nodesToCluster(all), nil
 }
 
 func (d *discovery) createSelf() error {
-	resp, err := d.c.Create(d.selfKey(), string(d.ctx), 0)
+	resp, err := d.c.Create(d.selfKey(), d.config, -1)
 	if err != nil {
 		return err
 	}
@@ -87,7 +115,7 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
 	nodes := make(client.Nodes, 0)
 	// append non-config keys to nodes
 	for _, n := range resp.Node.Nodes {
-		if !strings.HasPrefix(n.Key, configKey) {
+		if !strings.Contains(n.Key, configKey) {
 			nodes = append(nodes, n)
 		}
 	}
@@ -97,7 +125,7 @@ func (d *discovery) checkCluster() (client.Nodes, int, error) {
 
 	// find self position
 	for i := range nodes {
-		if nodes[i].Key == d.selfKey() {
+		if strings.Contains(nodes[i].Key, d.selfKey()) {
 			break
 		}
 		if i >= size-1 {
@@ -111,7 +139,7 @@ func (d *discovery) waitNodes(nodes client.Nodes, size int) (client.Nodes, error
 	if len(nodes) > size {
 		nodes = nodes[:size]
 	}
-	w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex)
+	w := d.c.RecursiveWatch(d.cluster, nodes[len(nodes)-1].ModifiedIndex+1)
 	all := make(client.Nodes, len(nodes))
 	copy(all, nodes)
 	// wait for others
@@ -129,17 +157,12 @@ func (d *discovery) selfKey() string {
 	return path.Join("/", d.cluster, fmt.Sprintf("%d", d.id))
 }
 
-func nodesToPeers(ns client.Nodes) (*etcdhttp.Peers, error) {
+func nodesToCluster(ns client.Nodes) string {
 	s := make([]string, len(ns))
 	for i, n := range ns {
 		s[i] = n.Value
 	}
-
-	var peers etcdhttp.Peers
-	if err := peers.Set(strings.Join(s, "&")); err != nil {
-		return nil, err
-	}
-	return &peers, nil
+	return strings.Join(s, ",")
 }
 
 type sortableNodes struct{ client.Nodes }

+ 5 - 29
discovery/discovery_test.go

@@ -10,7 +10,6 @@ import (
 	"time"
 
 	"github.com/coreos/etcd/client"
-	"github.com/coreos/etcd/etcdserver/etcdhttp"
 )
 
 func TestCheckCluster(t *testing.T) {
@@ -216,40 +215,17 @@ func TestCreateSelf(t *testing.T) {
 	}
 }
 
-func TestNodesToPeers(t *testing.T) {
+func TestNodesToCluster(t *testing.T) {
 	nodes := client.Nodes{
 		{Key: "/1000/1", Value: "1=1.1.1.1", CreatedIndex: 1},
 		{Key: "/1000/2", Value: "2=2.2.2.2", CreatedIndex: 2},
 		{Key: "/1000/3", Value: "3=3.3.3.3", CreatedIndex: 3},
 	}
-	w := &etcdhttp.Peers{}
-	w.Set("1=1.1.1.1&2=2.2.2.2&3=3.3.3.3")
+	w := "1=1.1.1.1,2=2.2.2.2,3=3.3.3.3"
 
-	badnodes := client.Nodes{{Key: "1000/1", Value: "1=1.1.1.1&???", CreatedIndex: 1}}
-
-	tests := []struct {
-		ns client.Nodes
-		wp *etcdhttp.Peers
-		we bool
-	}{
-		{nodes, w, false},
-		{badnodes, nil, true},
-	}
-
-	for i, tt := range tests {
-		peers, err := nodesToPeers(tt.ns)
-		if tt.we {
-			if err == nil {
-				t.Fatalf("#%d: err = %v, want not nil", i, err)
-			}
-		} else {
-			if err != nil {
-				t.Fatalf("#%d: err = %v, want nil", i, err)
-			}
-		}
-		if !reflect.DeepEqual(peers, tt.wp) {
-			t.Errorf("#%d: peers = %v, want %v", i, peers, tt.wp)
-		}
+	cluster := nodesToCluster(nodes)
+	if !reflect.DeepEqual(cluster, w) {
+		t.Errorf("cluster = %v, want %v", cluster, w)
 	}
 }
 

+ 21 - 6
etcdserver/server.go

@@ -11,6 +11,7 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/coreos/etcd/discovery"
 	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
 	"github.com/coreos/etcd/pkg/types"
 	"github.com/coreos/etcd/raft"
@@ -84,12 +85,13 @@ type RaftTimer interface {
 }
 
 type ServerConfig struct {
-	Name       string
-	ClientURLs types.URLs
-	DataDir    string
-	SnapCount  int64
-	Cluster    *Cluster
-	Transport  *http.Transport
+	Name         string
+	DiscoveryURL string
+	ClientURLs   types.URLs
+	DataDir      string
+	SnapCount    int64
+	Cluster      *Cluster
+	Transport    *http.Transport
 }
 
 // NewServer creates a new EtcdServer from the supplied configuration. The
@@ -111,6 +113,19 @@ func NewServer(cfg *ServerConfig) *EtcdServer {
 	var err error
 	waldir := path.Join(cfg.DataDir, "wal")
 	if !wal.Exist(waldir) {
+		if cfg.DiscoveryURL != "" {
+			d, err := discovery.New(cfg.DiscoveryURL, m.ID, cfg.Cluster.String())
+			if err != nil {
+				log.Fatalf("etcd: cannot init discovery %v", err)
+			}
+			s, err := d.Discover()
+			if err != nil {
+				log.Fatalf("etcd: %v", err)
+			}
+			if err = cfg.Cluster.Set(s); err != nil {
+				log.Fatalf("etcd: %v", err)
+			}
+		}
 		if w, err = wal.Create(waldir); err != nil {
 			log.Fatal(err)
 		}

+ 35 - 6
main.go

@@ -27,6 +27,7 @@ const (
 var (
 	name         = flag.String("name", "default", "Unique human-readable name for this node")
 	dir          = flag.String("data-dir", "", "Path to the data directory")
+	durl         = flag.String("discovery", "", "Discovery service used to bootstrap the cluster")
 	snapCount    = flag.Uint64("snapshot-count", etcdserver.DefaultSnapCount, "Number of committed transactions to trigger a snapshot")
 	printVersion = flag.Bool("version", false, "Print the version and exit")
 
@@ -97,6 +98,9 @@ func main() {
 	}
 
 	pkg.SetFlagsFromEnv(flag.CommandLine)
+	if err := setClusterForDiscovery(); err != nil {
+		log.Fatalf("etcd: %v", err)
+	}
 
 	if string(*proxyFlag) == flagtypes.ProxyValueOff {
 		startEtcd()
@@ -137,12 +141,13 @@ func startEtcd() {
 		log.Fatal(err.Error())
 	}
 	cfg := &etcdserver.ServerConfig{
-		Name:       *name,
-		ClientURLs: acurls,
-		DataDir:    *dir,
-		SnapCount:  int64(*snapCount),
-		Cluster:    cluster,
-		Transport:  pt,
+		Name:         *name,
+		ClientURLs:   acurls,
+		DataDir:      *dir,
+		SnapCount:    int64(*snapCount),
+		Cluster:      cluster,
+		Transport:    pt,
+		DiscoveryURL: *durl,
 	}
 	s := etcdserver.NewServer(cfg)
 	s.Start()
@@ -231,3 +236,27 @@ func startProxy() {
 		}()
 	}
 }
+
+// setClusterForDiscovery sets cluster to a temporary value if you are using
+// the discovery.
+func setClusterForDiscovery() error {
+	set := make(map[string]bool)
+	flag.Visit(func(f *flag.Flag) {
+		set[f.Name] = true
+	})
+	if set["discovery"] && set["bootstrap-config"] {
+		return fmt.Errorf("both discovery and bootstrap-config are set")
+	}
+	if set["discovery"] {
+		apurls, err := pkg.URLsFromFlags(flag.CommandLine, "advertise-peer-urls", "addr", peerTLSInfo)
+		if err != nil {
+			return err
+		}
+		addrs := make([]string, len(apurls))
+		for i := range apurls {
+			addrs[i] = apurls[i].String()
+		}
+		cluster.Set(fmt.Sprintf("%s=%s", *name, strings.Join(addrs, ",")))
+	}
+	return nil
+}