Bläddra i källkod

etcd: add clusterid to participant

Xiang Li 11 år sedan
förälder
incheckning
155bd09902
2 ändrade filer med 18 tillägg och 0 borttagningar
  1. 7 0
      etcd/etcd_test.go
  2. 11 0
      etcd/participant.go

+ 7 - 0
etcd/etcd_test.go

@@ -416,6 +416,13 @@ func waitCluster(t *testing.T, es []*Server) {
 			}
 			}
 		}
 		}
 	}
 	}
+
+	clusterId := es[0].p.node.ClusterId()
+	for i, e := range es {
+		if e.p.node.ClusterId() != clusterId {
+			t.Errorf("#%d: clusterId = %x, want %x", i, e.p.node.ClusterId(), clusterId)
+		}
+	}
 }
 }
 
 
 func waitMode(mode int64, e *Server) {
 func waitMode(mode int64, e *Server) {

+ 11 - 0
etcd/participant.go

@@ -20,6 +20,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"fmt"
 	"fmt"
 	"log"
 	"log"
+	"math/rand"
 	"net/http"
 	"net/http"
 	"path"
 	"path"
 	"sync"
 	"sync"
@@ -58,6 +59,7 @@ var (
 
 
 type participant struct {
 type participant struct {
 	id           int64
 	id           int64
+	clusterId    int64
 	pubAddr      string
 	pubAddr      string
 	raftPubAddr  string
 	raftPubAddr  string
 	seeds        map[string]bool
 	seeds        map[string]bool
@@ -83,6 +85,7 @@ type participant struct {
 func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2client, peerHub *peerHub, tickDuration time.Duration) *participant {
 func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2client, peerHub *peerHub, tickDuration time.Duration) *participant {
 	p := &participant{
 	p := &participant{
 		id:           id,
 		id:           id,
+		clusterId:    -1,
 		pubAddr:      pubAddr,
 		pubAddr:      pubAddr,
 		raftPubAddr:  raftPubAddr,
 		raftPubAddr:  raftPubAddr,
 		tickDuration: tickDuration,
 		tickDuration: tickDuration,
@@ -120,6 +123,7 @@ func (p *participant) run() int64 {
 	if len(seeds) == 0 {
 	if len(seeds) == 0 {
 		log.Println("starting a bootstrap node")
 		log.Println("starting a bootstrap node")
 		p.node.Campaign()
 		p.node.Campaign()
+		p.node.InitCluster(genId())
 		p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr))
 		p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr))
 		p.apply(p.node.Next())
 		p.apply(p.node.Next())
 	} else {
 	} else {
@@ -280,6 +284,8 @@ func (p *participant) apply(ents []raft.Entry) {
 				continue
 				continue
 			}
 			}
 			p.v2apply(offset+int64(i), ent)
 			p.v2apply(offset+int64(i), ent)
+		case raft.ClusterInit:
+			p.clusterId = p.node.ClusterId()
 		case raft.AddNode:
 		case raft.AddNode:
 			cfg := new(raft.Config)
 			cfg := new(raft.Config)
 			if err := json.Unmarshal(ent.Data, cfg); err != nil {
 			if err := json.Unmarshal(ent.Data, cfg); err != nil {
@@ -343,3 +349,8 @@ func (p *participant) join() {
 	}
 	}
 	log.Println("fail to join the cluster")
 	log.Println("fail to join the cluster")
 }
 }
+
+func genId() int64 {
+	r := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
+	return r.Int63()
+}