Browse Source

server: add discovery

Yicheng Qin 11 years ago
parent
commit
c952e91c4f
3 changed files with 168 additions and 5 deletions
  1. 134 0
      etcd/discovery.go
  2. 29 3
      etcd/etcd.go
  3. 5 2
      etcd/peer_hub.go

+ 134 - 0
etcd/discovery.go

@@ -0,0 +1,134 @@
+/*
+Copyright 2014 CoreOS Inc.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package etcd
+
+import (
+	"errors"
+	"fmt"
+	"log"
+	"net/url"
+	"path"
+	"strings"
+	"time"
+
+	etcdErr "github.com/coreos/etcd/error"
+	"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
+)
+
+const (
+	stateKey     = "_state"
+	startedState = "started"
+	defaultTTL   = 604800 // One week TTL
+)
+
+type discoverer struct {
+	client *etcd.Client
+	name   string
+	addr   string
+	prefix string
+}
+
+func newDiscoverer(u *url.URL, name, raftPubAddr string) *discoverer {
+	d := &discoverer{name: name, addr: raftPubAddr}
+
+	// prefix is prepended to all keys for this discovery
+	d.prefix = strings.TrimPrefix(u.Path, "/v2/keys/")
+
+	// keep the old path in case we need to set the KeyPrefix below
+	oldPath := u.Path
+	u.Path = ""
+
+	// Connect to a scheme://host not a full URL with path
+	log.Println("Discovery via %s using prefix %s.", u.String(), d.prefix)
+	d.client = etcd.NewClient([]string{u.String()})
+
+	if !strings.HasPrefix(oldPath, "/v2/keys") {
+		d.client.SetKeyPrefix("")
+	}
+	return d
+}
+
+func (d *discoverer) discover() ([]string, error) {
+	if _, err := d.client.Set(path.Join(d.prefix, d.name), d.addr, defaultTTL); err != nil {
+		return nil, err
+	}
+
+	// Attempt to take the leadership role, if there is no error we are it!
+	resp, err := d.client.Create(path.Join(d.prefix, stateKey), startedState, 0)
+	// Bail out on unexpected errors
+	if err != nil {
+		if clientErr, ok := err.(*etcd.EtcdError); !ok || clientErr.ErrorCode != etcdErr.EcodeNodeExist {
+			return nil, err
+		}
+	}
+
+	// If we got a response then the CAS was successful, we are leader
+	if resp != nil && resp.Node.Value == startedState {
+		// We are the leader, we have no peers
+		log.Println("Discovery _state was empty, so this machine is the initial leader.")
+		return nil, nil
+	}
+
+	// Fall through to finding the other discovery peers
+	return d.findPeers()
+}
+
+func (d *discoverer) findPeers() (peers []string, err error) {
+	resp, err := d.client.Get(path.Join(d.prefix), false, true)
+	if err != nil {
+		return nil, err
+	}
+
+	node := resp.Node
+
+	if node == nil {
+		return nil, fmt.Errorf("%s key doesn't exist.", d.prefix)
+	}
+
+	for _, n := range node.Nodes {
+		// Skip our own entry in the list, there is no point
+		if strings.HasSuffix(n.Key, "/"+d.name) {
+			continue
+		}
+		peers = append(peers, n.Value)
+	}
+
+	if len(peers) == 0 {
+		return nil, errors.New("Discovery found an initialized cluster but no reachable peers are registered.")
+	}
+
+	log.Printf("Discovery found peers %v\n", peers)
+	return
+}
+
+func (d *discoverer) heartbeat(stopc <-chan struct{}) {
+	// In case of errors we should attempt to heartbeat fairly frequently
+	heartbeatInterval := defaultTTL / 8
+	ticker := time.NewTicker(time.Second * time.Duration(heartbeatInterval))
+	defer ticker.Stop()
+	for {
+		if _, err := d.client.Set(path.Join(d.prefix, d.name), d.addr, defaultTTL); err != nil {
+			log.Println("Discovery heartbeat failed: %v", err)
+		}
+
+		select {
+		case <-ticker.C:
+		case <-stopc:
+			return
+		}
+	}
+}

+ 29 - 3
etcd/etcd.go

@@ -18,8 +18,10 @@ package etcd
 
 import (
 	"crypto/tls"
+	"fmt"
 	"log"
 	"net/http"
+	"net/url"
 	"sync"
 	"time"
 
@@ -81,7 +83,7 @@ func New(c *config.Config) *Server {
 		mode: atomicInt(stopMode),
 
 		client:  newClient(tc),
-		peerHub: newPeerHub(c.Peers, client),
+		peerHub: newPeerHub(client),
 
 		stopc: make(chan struct{}),
 	}
@@ -136,7 +138,24 @@ func (s *Server) ServeRaftHTTP(w http.ResponseWriter, r *http.Request) {
 	}
 }
 
-func (s *Server) Run() {
+func (s *Server) Run() error {
+	var d *discoverer
+	var seeds []string
+	durl := s.config.Discovery
+	if durl != "" {
+		u, err := url.Parse(durl)
+		if err != nil {
+			return fmt.Errorf("bad discovery URL error: %v", err)
+		}
+		d = newDiscoverer(u, fmt.Sprint(s.id), s.raftPubAddr)
+		if seeds, err = d.discover(); err != nil {
+			return err
+		}
+	} else {
+		seeds = s.config.Peers
+	}
+	s.peerHub.setSeeds(seeds)
+
 	next := participantMode
 	for {
 		s.mu.Lock()
@@ -146,9 +165,16 @@ func (s *Server) Run() {
 		switch next {
 		case participantMode:
 			s.p = newParticipant(s.id, s.pubAddr, s.raftPubAddr, s.client, s.peerHub, s.tickDuration)
+			dStopc := make(chan struct{})
+			if d != nil {
+				go d.heartbeat(dStopc)
+			}
 			s.mode.Set(participantMode)
 			s.mu.Unlock()
 			next = s.p.run()
+			if d != nil {
+				close(dStopc)
+			}
 		case standbyMode:
 			s.s = newStandby(s.client, s.peerHub)
 			s.mode.Set(standbyMode)
@@ -158,7 +184,7 @@ func (s *Server) Run() {
 			s.mode.Set(stopMode)
 			s.mu.Unlock()
 			s.stopc <- struct{}{}
-			return
+			return nil
 		default:
 			panic("unsupport mode")
 		}

+ 5 - 2
etcd/peer_hub.go

@@ -45,16 +45,19 @@ type peerHub struct {
 	c       *http.Client
 }
 
-func newPeerHub(seeds []string, c *http.Client) *peerHub {
+func newPeerHub(c *http.Client) *peerHub {
 	h := &peerHub{
 		peers: make(map[int64]*peer),
 		seeds: make(map[string]bool),
 		c:     c,
 	}
+	return h
+}
+
+func (h *peerHub) setSeeds(seeds []string) {
 	for _, seed := range seeds {
 		h.seeds[seed] = true
 	}
-	return h
 }
 
 func (h *peerHub) getSeeds() map[string]bool {