participant.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "log"
  18. "math/rand"
  19. "net/http"
  20. "path"
  21. "sync"
  22. "time"
  23. etcdErr "github.com/coreos/etcd/error"
  24. "github.com/coreos/etcd/raft"
  25. "github.com/coreos/etcd/store"
  26. )
  27. const (
  28. defaultHeartbeat = 1
  29. defaultElection = 5
  30. maxBufferedProposal = 128
  31. defaultTickDuration = time.Millisecond * 100
  32. v2machineKVPrefix = "/_etcd/machines"
  33. v2configKVPrefix = "/_etcd/config"
  34. v2Prefix = "/v2/keys"
  35. v2machinePrefix = "/v2/machines"
  36. v2peersPrefix = "/v2/peers"
  37. v2LeaderPrefix = "/v2/leader"
  38. v2StoreStatsPrefix = "/v2/stats/store"
  39. v2adminConfigPrefix = "/v2/admin/config"
  40. v2adminMachinesPrefix = "/v2/admin/machines/"
  41. )
  42. var (
  43. tmpErr = fmt.Errorf("try again")
  44. stopErr = fmt.Errorf("server is stopped")
  45. raftStopErr = fmt.Errorf("raft is stopped")
  46. )
  47. type participant struct {
  48. id int64
  49. clusterId int64
  50. pubAddr string
  51. raftPubAddr string
  52. seeds map[string]bool
  53. tickDuration time.Duration
  54. client *v2client
  55. peerHub *peerHub
  56. proposal chan v2Proposal
  57. addNodeC chan raft.Config
  58. removeNodeC chan raft.Config
  59. node *v2Raft
  60. store.Store
  61. rh *raftHandler
  62. stopped bool
  63. mu sync.Mutex
  64. stopc chan struct{}
  65. *http.ServeMux
  66. }
  67. func newParticipant(id int64, pubAddr string, raftPubAddr string, client *v2client, peerHub *peerHub, tickDuration time.Duration) *participant {
  68. p := &participant{
  69. id: id,
  70. clusterId: -1,
  71. pubAddr: pubAddr,
  72. raftPubAddr: raftPubAddr,
  73. tickDuration: tickDuration,
  74. client: client,
  75. peerHub: peerHub,
  76. proposal: make(chan v2Proposal, maxBufferedProposal),
  77. addNodeC: make(chan raft.Config, 1),
  78. removeNodeC: make(chan raft.Config, 1),
  79. node: &v2Raft{
  80. Node: raft.New(id, defaultHeartbeat, defaultElection),
  81. result: make(map[wait]chan interface{}),
  82. },
  83. Store: store.New(),
  84. rh: newRaftHandler(peerHub),
  85. stopc: make(chan struct{}),
  86. ServeMux: http.NewServeMux(),
  87. }
  88. p.Handle(v2Prefix+"/", handlerErr(p.serveValue))
  89. p.Handle(v2machinePrefix, handlerErr(p.serveMachines))
  90. p.Handle(v2peersPrefix, handlerErr(p.serveMachines))
  91. p.Handle(v2LeaderPrefix, handlerErr(p.serveLeader))
  92. p.Handle(v2StoreStatsPrefix, handlerErr(p.serveStoreStats))
  93. p.Handle(v2adminConfigPrefix, handlerErr(p.serveAdminConfig))
  94. p.Handle(v2adminMachinesPrefix, handlerErr(p.serveAdminMachines))
  95. return p
  96. }
  97. func (p *participant) run() int64 {
  98. seeds := p.peerHub.getSeeds()
  99. if len(seeds) == 0 {
  100. log.Println("starting a bootstrap node")
  101. p.node.Campaign()
  102. p.node.InitCluster(genId())
  103. p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr))
  104. p.apply(p.node.Next())
  105. } else {
  106. log.Println("joining cluster via peers", seeds)
  107. p.join()
  108. }
  109. p.rh.start()
  110. defer p.rh.stop()
  111. node := p.node
  112. defer node.StopProposalWaiters()
  113. recv := p.rh.recv
  114. ticker := time.NewTicker(p.tickDuration)
  115. v2SyncTicker := time.NewTicker(time.Millisecond * 500)
  116. var proposal chan v2Proposal
  117. var addNodeC, removeNodeC chan raft.Config
  118. for {
  119. if node.HasLeader() {
  120. proposal = p.proposal
  121. addNodeC = p.addNodeC
  122. removeNodeC = p.removeNodeC
  123. } else {
  124. proposal = nil
  125. addNodeC = nil
  126. removeNodeC = nil
  127. }
  128. select {
  129. case p := <-proposal:
  130. node.Propose(p)
  131. case c := <-addNodeC:
  132. node.UpdateConf(raft.AddNode, &c)
  133. case c := <-removeNodeC:
  134. node.UpdateConf(raft.RemoveNode, &c)
  135. case msg := <-recv:
  136. node.Step(*msg)
  137. case <-ticker.C:
  138. node.Tick()
  139. case <-v2SyncTicker.C:
  140. node.Sync()
  141. case <-p.stopc:
  142. log.Printf("Participant %x stopped\n", p.id)
  143. return stopMode
  144. }
  145. p.apply(node.Next())
  146. p.send(node.Msgs())
  147. if node.IsRemoved() {
  148. log.Printf("Participant %x return\n", p.id)
  149. p.stop()
  150. return standbyMode
  151. }
  152. }
  153. }
  154. func (p *participant) stop() {
  155. p.mu.Lock()
  156. defer p.mu.Unlock()
  157. if p.stopped {
  158. return
  159. }
  160. p.stopped = true
  161. close(p.stopc)
  162. }
  163. func (p *participant) raftHandler() http.Handler {
  164. return p.rh
  165. }
  166. func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
  167. pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
  168. _, err := p.Get(pp, false, false)
  169. if err == nil {
  170. return nil
  171. }
  172. if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound {
  173. return err
  174. }
  175. w, err := p.Watch(pp, true, false, 0)
  176. if err != nil {
  177. log.Println("add error:", err)
  178. return tmpErr
  179. }
  180. select {
  181. case p.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}:
  182. default:
  183. w.Remove()
  184. log.Println("unable to send out addNode proposal")
  185. return tmpErr
  186. }
  187. select {
  188. case v := <-w.EventChan:
  189. if v.Action == store.Set {
  190. return nil
  191. }
  192. log.Println("add error: action =", v.Action)
  193. return tmpErr
  194. case <-time.After(6 * defaultHeartbeat * p.tickDuration):
  195. w.Remove()
  196. log.Println("add error: wait timeout")
  197. return tmpErr
  198. case <-p.stopc:
  199. return stopErr
  200. }
  201. }
  202. func (p *participant) remove(id int64) error {
  203. pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
  204. v, err := p.Get(pp, false, false)
  205. if err != nil {
  206. return nil
  207. }
  208. select {
  209. case p.removeNodeC <- raft.Config{NodeId: id}:
  210. default:
  211. log.Println("unable to send out removeNode proposal")
  212. return tmpErr
  213. }
  214. // TODO(xiangli): do not need to watch if the
  215. // removal target is self
  216. w, err := p.Watch(pp, true, false, v.Index()+1)
  217. if err != nil {
  218. log.Println("remove error:", err)
  219. return tmpErr
  220. }
  221. select {
  222. case v := <-w.EventChan:
  223. if v.Action == store.Delete {
  224. return nil
  225. }
  226. log.Println("remove error: action =", v.Action)
  227. return tmpErr
  228. case <-time.After(6 * defaultHeartbeat * p.tickDuration):
  229. w.Remove()
  230. log.Println("remove error: wait timeout")
  231. return tmpErr
  232. case <-p.stopc:
  233. return stopErr
  234. }
  235. }
  236. func (p *participant) apply(ents []raft.Entry) {
  237. offset := p.node.Applied() - int64(len(ents)) + 1
  238. for i, ent := range ents {
  239. switch ent.Type {
  240. // expose raft entry type
  241. case raft.Normal:
  242. if len(ent.Data) == 0 {
  243. continue
  244. }
  245. p.v2apply(offset+int64(i), ent)
  246. case raft.ClusterInit:
  247. p.clusterId = p.node.ClusterId()
  248. case raft.AddNode:
  249. cfg := new(raft.Config)
  250. if err := json.Unmarshal(ent.Data, cfg); err != nil {
  251. log.Println(err)
  252. break
  253. }
  254. peer, err := p.peerHub.add(cfg.NodeId, cfg.Addr)
  255. if err != nil {
  256. log.Println(err)
  257. break
  258. }
  259. peer.participate()
  260. log.Printf("Add Node %x %v %v\n", cfg.NodeId, cfg.Addr, string(cfg.Context))
  261. pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
  262. p.Store.Set(pp, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent)
  263. case raft.RemoveNode:
  264. cfg := new(raft.Config)
  265. if err := json.Unmarshal(ent.Data, cfg); err != nil {
  266. log.Println(err)
  267. break
  268. }
  269. log.Printf("Remove Node %x\n", cfg.NodeId)
  270. peer, err := p.peerHub.peer(cfg.NodeId)
  271. if err != nil {
  272. log.Fatal("cannot get the added peer:", err)
  273. }
  274. peer.idle()
  275. pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
  276. p.Store.Delete(pp, false, false)
  277. default:
  278. panic("unimplemented")
  279. }
  280. }
  281. }
  282. func (p *participant) send(msgs []raft.Message) {
  283. for i := range msgs {
  284. if err := p.peerHub.send(msgs[i]); err != nil {
  285. log.Println("send:", err)
  286. }
  287. }
  288. }
  289. func (p *participant) join() {
  290. info := &context{
  291. MinVersion: store.MinVersion(),
  292. MaxVersion: store.MaxVersion(),
  293. ClientURL: p.pubAddr,
  294. PeerURL: p.raftPubAddr,
  295. }
  296. for {
  297. for seed := range p.peerHub.getSeeds() {
  298. if err := p.client.AddMachine(seed, fmt.Sprint(p.id), info); err == nil {
  299. return
  300. } else {
  301. log.Println(err)
  302. }
  303. }
  304. time.Sleep(100 * time.Millisecond)
  305. }
  306. log.Println("fail to join the cluster")
  307. }
  308. func genId() int64 {
  309. r := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
  310. return r.Int63()
  311. }