participant.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "log"
  18. "math/rand"
  19. "net/http"
  20. "os"
  21. "path"
  22. "sync"
  23. "time"
  24. etcdErr "github.com/coreos/etcd/error"
  25. "github.com/coreos/etcd/raft"
  26. "github.com/coreos/etcd/store"
  27. "github.com/coreos/etcd/wal"
  28. )
  29. const (
  30. defaultHeartbeat = 1
  31. defaultElection = 5
  32. maxBufferedProposal = 128
  33. defaultTickDuration = time.Millisecond * 100
  34. v2machineKVPrefix = "/_etcd/machines"
  35. v2configKVPrefix = "/_etcd/config"
  36. v2Prefix = "/v2/keys"
  37. v2machinePrefix = "/v2/machines"
  38. v2peersPrefix = "/v2/peers"
  39. v2LeaderPrefix = "/v2/leader"
  40. v2StoreStatsPrefix = "/v2/stats/store"
  41. v2adminConfigPrefix = "/v2/admin/config"
  42. v2adminMachinesPrefix = "/v2/admin/machines/"
  43. )
  44. var (
  45. tmpErr = fmt.Errorf("try again")
  46. stopErr = fmt.Errorf("server is stopped")
  47. raftStopErr = fmt.Errorf("raft is stopped")
  48. )
  49. type participant struct {
  50. id int64
  51. clusterId int64
  52. pubAddr string
  53. raftPubAddr string
  54. seeds map[string]bool
  55. tickDuration time.Duration
  56. client *v2client
  57. peerHub *peerHub
  58. proposal chan v2Proposal
  59. addNodeC chan raft.Config
  60. removeNodeC chan raft.Config
  61. node *v2Raft
  62. store.Store
  63. rh *raftHandler
  64. w *wal.WAL
  65. stopped bool
  66. mu sync.Mutex
  67. stopc chan struct{}
  68. *http.ServeMux
  69. }
  70. func newParticipant(id int64, pubAddr string, raftPubAddr string, dir string, client *v2client, peerHub *peerHub, tickDuration time.Duration) (*participant, error) {
  71. p := &participant{
  72. clusterId: -1,
  73. tickDuration: tickDuration,
  74. client: client,
  75. peerHub: peerHub,
  76. proposal: make(chan v2Proposal, maxBufferedProposal),
  77. addNodeC: make(chan raft.Config, 1),
  78. removeNodeC: make(chan raft.Config, 1),
  79. node: &v2Raft{
  80. result: make(map[wait]chan interface{}),
  81. },
  82. Store: store.New(),
  83. stopc: make(chan struct{}),
  84. ServeMux: http.NewServeMux(),
  85. }
  86. p.rh = newRaftHandler(peerHub, p.Store.Version())
  87. walPath := path.Join(dir, "wal")
  88. w, err := wal.Open(walPath)
  89. if err != nil {
  90. if !os.IsNotExist(err) {
  91. return nil, err
  92. }
  93. p.id = id
  94. p.pubAddr = pubAddr
  95. p.raftPubAddr = raftPubAddr
  96. if w, err = wal.New(walPath); err != nil {
  97. return nil, err
  98. }
  99. if err = w.SaveInfo(p.id); err != nil {
  100. return nil, err
  101. }
  102. p.node.Node = raft.New(p.id, defaultHeartbeat, defaultElection)
  103. log.Printf("id=%x participant.new path=%s\n", p.id, walPath)
  104. } else {
  105. n, err := w.LoadNode()
  106. if err != nil {
  107. return nil, err
  108. }
  109. p.id = n.Id
  110. p.node.Node = raft.Recover(n.Id, n.Ents, n.State, defaultHeartbeat, defaultElection)
  111. p.apply(p.node.Next())
  112. log.Printf("id=%x participant.load path=%s state=\"%+v\" len(ents)=%d", p.id, walPath, n.State, len(n.Ents))
  113. }
  114. p.w = w
  115. p.Handle(v2Prefix+"/", handlerErr(p.serveValue))
  116. p.Handle(v2machinePrefix, handlerErr(p.serveMachines))
  117. p.Handle(v2peersPrefix, handlerErr(p.serveMachines))
  118. p.Handle(v2LeaderPrefix, handlerErr(p.serveLeader))
  119. p.Handle(v2StoreStatsPrefix, handlerErr(p.serveStoreStats))
  120. p.Handle(v2adminConfigPrefix, handlerErr(p.serveAdminConfig))
  121. p.Handle(v2adminMachinesPrefix, handlerErr(p.serveAdminMachines))
  122. return p, nil
  123. }
  124. func (p *participant) run() int64 {
  125. defer p.w.Close()
  126. if p.node.IsEmpty() {
  127. seeds := p.peerHub.getSeeds()
  128. if len(seeds) == 0 {
  129. log.Printf("id=%x participant.run action=bootstrap\n", p.id)
  130. p.node.Campaign()
  131. p.node.InitCluster(genId())
  132. p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr))
  133. p.apply(p.node.Next())
  134. } else {
  135. log.Printf("id=%x participant.run action=join seeds=\"%v\"\n", p.id, seeds)
  136. p.join()
  137. }
  138. }
  139. p.rh.start()
  140. defer p.rh.stop()
  141. node := p.node
  142. defer node.StopProposalWaiters()
  143. recv := p.rh.recv
  144. ticker := time.NewTicker(p.tickDuration)
  145. v2SyncTicker := time.NewTicker(time.Millisecond * 500)
  146. var proposal chan v2Proposal
  147. var addNodeC, removeNodeC chan raft.Config
  148. for {
  149. if node.HasLeader() {
  150. proposal = p.proposal
  151. addNodeC = p.addNodeC
  152. removeNodeC = p.removeNodeC
  153. } else {
  154. proposal = nil
  155. addNodeC = nil
  156. removeNodeC = nil
  157. }
  158. select {
  159. case p := <-proposal:
  160. node.Propose(p)
  161. case c := <-addNodeC:
  162. node.UpdateConf(raft.AddNode, &c)
  163. case c := <-removeNodeC:
  164. node.UpdateConf(raft.RemoveNode, &c)
  165. case msg := <-recv:
  166. node.Step(*msg)
  167. case <-ticker.C:
  168. node.Tick()
  169. case <-v2SyncTicker.C:
  170. node.Sync()
  171. case <-p.stopc:
  172. log.Printf("id=%x participant.stop\n", p.id)
  173. return stopMode
  174. }
  175. p.apply(node.Next())
  176. ents := node.UnstableEnts()
  177. p.save(ents, node.UnstableState())
  178. p.send(node.Msgs())
  179. if node.IsRemoved() {
  180. p.stop()
  181. log.Printf("id=%x participant.end\n", p.id)
  182. return standbyMode
  183. }
  184. }
  185. }
  186. func (p *participant) stop() {
  187. p.mu.Lock()
  188. defer p.mu.Unlock()
  189. if p.stopped {
  190. return
  191. }
  192. p.stopped = true
  193. close(p.stopc)
  194. }
  195. func (p *participant) raftHandler() http.Handler {
  196. return p.rh
  197. }
  198. func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
  199. log.Printf("id=%x participant.add nodeId=%x raftPubAddr=%s pubAddr=%s\n", p.id, id, raftPubAddr, pubAddr)
  200. pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
  201. _, err := p.Get(pp, false, false)
  202. if err == nil {
  203. return nil
  204. }
  205. if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound {
  206. log.Printf("id=%x participant.add getErr=\"%v\"\n", p.id, err)
  207. return err
  208. }
  209. w, err := p.Watch(pp, true, false, 0)
  210. if err != nil {
  211. log.Printf("id=%x participant.add watchErr=\"%v\"\n", p.id, err)
  212. return tmpErr
  213. }
  214. select {
  215. case p.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}:
  216. default:
  217. w.Remove()
  218. log.Printf("id=%x participant.add proposeErr=\"unable to send out addNode proposal\"\n", p.id)
  219. return tmpErr
  220. }
  221. select {
  222. case v := <-w.EventChan:
  223. if v.Action == store.Set {
  224. return nil
  225. }
  226. log.Printf("id=%x participant.add watchErr=\"unexpected action\" action=%s\n", p.id, v.Action)
  227. return tmpErr
  228. case <-time.After(6 * defaultHeartbeat * p.tickDuration):
  229. w.Remove()
  230. log.Printf("id=%x participant.add watchErr=timeout\n", p.id)
  231. return tmpErr
  232. case <-p.stopc:
  233. return stopErr
  234. }
  235. }
  236. func (p *participant) remove(id int64) error {
  237. log.Printf("id=%x participant.remove nodeId=%x\n", p.id, id)
  238. pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
  239. v, err := p.Get(pp, false, false)
  240. if err != nil {
  241. return nil
  242. }
  243. select {
  244. case p.removeNodeC <- raft.Config{NodeId: id}:
  245. default:
  246. log.Printf("id=%x participant.remove proposeErr=\"unable to send out removeNode proposal\"\n", p.id)
  247. return tmpErr
  248. }
  249. // TODO(xiangli): do not need to watch if the
  250. // removal target is self
  251. w, err := p.Watch(pp, true, false, v.Index()+1)
  252. if err != nil {
  253. log.Printf("id=%x participant.remove watchErr=\"%v\"\n", p.id, err)
  254. return tmpErr
  255. }
  256. select {
  257. case v := <-w.EventChan:
  258. if v.Action == store.Delete {
  259. return nil
  260. }
  261. log.Printf("id=%x participant.remove watchErr=\"unexpected action\" action=%s\n", p.id, v.Action)
  262. return tmpErr
  263. case <-time.After(6 * defaultHeartbeat * p.tickDuration):
  264. w.Remove()
  265. log.Printf("id=%x participant.remove watchErr=timeout\n", p.id)
  266. return tmpErr
  267. case <-p.stopc:
  268. return stopErr
  269. }
  270. }
  271. func (p *participant) apply(ents []raft.Entry) {
  272. offset := p.node.Applied() - int64(len(ents)) + 1
  273. for i, ent := range ents {
  274. switch ent.Type {
  275. // expose raft entry type
  276. case raft.Normal:
  277. if len(ent.Data) == 0 {
  278. continue
  279. }
  280. p.v2apply(offset+int64(i), ent)
  281. case raft.ClusterInit:
  282. p.clusterId = p.node.ClusterId()
  283. log.Printf("id=%x participant.cluster.setId clusterId=%x\n", p.id, p.clusterId)
  284. case raft.AddNode:
  285. cfg := new(raft.Config)
  286. if err := json.Unmarshal(ent.Data, cfg); err != nil {
  287. log.Printf("id=%x participant.cluster.addNode unmarshalErr=\"%v\"\n", p.id, err)
  288. break
  289. }
  290. peer, err := p.peerHub.add(cfg.NodeId, cfg.Addr)
  291. if err != nil {
  292. log.Printf("id=%x participant.cluster.addNode peerAddErr=\"%v\"\n", p.id, err)
  293. break
  294. }
  295. peer.participate()
  296. pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
  297. p.Store.Set(pp, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent)
  298. if p.id == cfg.NodeId {
  299. p.raftPubAddr = cfg.Addr
  300. p.pubAddr = string(cfg.Context)
  301. }
  302. log.Printf("id=%x participant.cluster.addNode nodeId=%x addr=%s context=%s\n", p.id, cfg.NodeId, cfg.Addr, cfg.Context)
  303. case raft.RemoveNode:
  304. cfg := new(raft.Config)
  305. if err := json.Unmarshal(ent.Data, cfg); err != nil {
  306. log.Printf("id=%x participant.cluster.removeNode unmarshalErr=\"%v\"\n", p.id, err)
  307. break
  308. }
  309. peer, err := p.peerHub.peer(cfg.NodeId)
  310. if err != nil {
  311. log.Fatal("id=%x participant.apply getPeerErr=\"%v\"", p.id, err)
  312. }
  313. peer.idle()
  314. pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
  315. p.Store.Delete(pp, false, false)
  316. log.Printf("id=%x participant.cluster.removeNode nodeId=%x\n", p.id, cfg.NodeId)
  317. default:
  318. panic("unimplemented")
  319. }
  320. }
  321. }
  322. func (p *participant) save(ents []raft.Entry, state raft.State) {
  323. for _, ent := range ents {
  324. if err := p.w.SaveEntry(&ent); err != nil {
  325. log.Panicf("id=%x participant.save saveEntryErr=%q", p.id, err)
  326. }
  327. }
  328. if state != raft.EmptyState {
  329. if err := p.w.SaveState(&state); err != nil {
  330. log.Panicf("id=%x participant.save saveStateErr=%q", p.id, err)
  331. }
  332. }
  333. if err := p.w.Sync(); err != nil {
  334. log.Panicf("id=%x participant.save syncErr=%q", p.id, err)
  335. }
  336. }
  337. func (p *participant) send(msgs []raft.Message) {
  338. for i := range msgs {
  339. if err := p.peerHub.send(msgs[i]); err != nil {
  340. log.Printf("id=%x participant.send err=\"%v\"\n", p.id, err)
  341. }
  342. }
  343. }
  344. func (p *participant) join() {
  345. info := &context{
  346. MinVersion: store.MinVersion(),
  347. MaxVersion: store.MaxVersion(),
  348. ClientURL: p.pubAddr,
  349. PeerURL: p.raftPubAddr,
  350. }
  351. for {
  352. for seed := range p.peerHub.getSeeds() {
  353. if err := p.client.AddMachine(seed, fmt.Sprint(p.id), info); err == nil {
  354. return
  355. } else {
  356. log.Printf("id=%x participant.join addMachineErr=\"%v\"\n", p.id, err)
  357. }
  358. }
  359. time.Sleep(100 * time.Millisecond)
  360. }
  361. }
  362. func genId() int64 {
  363. r := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
  364. return r.Int63()
  365. }