participant.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "log"
  18. "math/rand"
  19. "net/http"
  20. "os"
  21. "path"
  22. "time"
  23. "github.com/coreos/etcd/conf"
  24. etcdErr "github.com/coreos/etcd/error"
  25. "github.com/coreos/etcd/raft"
  26. "github.com/coreos/etcd/snap"
  27. "github.com/coreos/etcd/store"
  28. "github.com/coreos/etcd/wal"
  29. )
  30. const (
  31. defaultHeartbeat = 1
  32. defaultElection = 5
  33. defaultCompact = 10000
  34. maxBufferedProposal = 128
  35. defaultTickDuration = time.Millisecond * 100
  36. v2machineKVPrefix = "/_etcd/machines"
  37. v2configKVPrefix = "/_etcd/config"
  38. v2Prefix = "/v2/keys"
  39. v2machinePrefix = "/v2/machines"
  40. v2peersPrefix = "/v2/peers"
  41. v2LeaderPrefix = "/v2/leader"
  42. v2SelfStatsPrefix = "/v2/stats/self"
  43. v2LeaderStatsPrefix = "/v2/stats/leader"
  44. v2StoreStatsPrefix = "/v2/stats/store"
  45. v2adminConfigPrefix = "/v2/admin/config"
  46. v2adminMachinesPrefix = "/v2/admin/machines/"
  47. )
  48. var (
  49. tmpErr = fmt.Errorf("try again")
  50. stopErr = fmt.Errorf("server is stopped")
  51. raftStopErr = fmt.Errorf("raft is stopped")
  52. )
  53. type participant struct {
  54. id int64
  55. clusterId int64
  56. cfg *conf.Config
  57. pubAddr string
  58. raftPubAddr string
  59. tickDuration time.Duration
  60. client *v2client
  61. peerHub *peerHub
  62. proposal chan v2Proposal
  63. addNodeC chan raft.Config
  64. removeNodeC chan raft.Config
  65. node *v2Raft
  66. store.Store
  67. rh *raftHandler
  68. w *wal.WAL
  69. snapshotter *snap.Snapshotter
  70. serverStats *raftServerStats
  71. stopNotifyc chan struct{}
  72. *http.ServeMux
  73. }
  74. func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDuration time.Duration) (*participant, error) {
  75. p := &participant{
  76. clusterId: -1,
  77. cfg: c,
  78. tickDuration: tickDuration,
  79. client: client,
  80. peerHub: peerHub,
  81. proposal: make(chan v2Proposal, maxBufferedProposal),
  82. addNodeC: make(chan raft.Config, 1),
  83. removeNodeC: make(chan raft.Config, 1),
  84. node: &v2Raft{
  85. result: make(map[wait]chan interface{}),
  86. },
  87. Store: store.New(),
  88. serverStats: NewRaftServerStats(c.Name),
  89. stopNotifyc: make(chan struct{}),
  90. ServeMux: http.NewServeMux(),
  91. }
  92. p.rh = newRaftHandler(peerHub, p.Store.Version(), p.serverStats)
  93. p.peerHub.setServerStats(p.serverStats)
  94. snapDir := path.Join(p.cfg.DataDir, "snap")
  95. if err := os.Mkdir(snapDir, 0700); err != nil {
  96. if !os.IsExist(err) {
  97. log.Printf("id=%x participant.new.mkdir err=%v", p.id, err)
  98. return nil, err
  99. }
  100. }
  101. p.snapshotter = snap.New(snapDir)
  102. walDir := path.Join(p.cfg.DataDir, "wal")
  103. if err := os.Mkdir(walDir, 0700); err != nil {
  104. if !os.IsExist(err) {
  105. log.Printf("id=%x participant.new.mkdir err=%v", p.id, err)
  106. return nil, err
  107. }
  108. }
  109. var w *wal.WAL
  110. var err error
  111. if !wal.Exist(walDir) {
  112. p.id = genId()
  113. p.pubAddr = c.Addr
  114. p.raftPubAddr = c.Peer.Addr
  115. if w, err = wal.Create(walDir); err != nil {
  116. return nil, err
  117. }
  118. p.node.Node = raft.New(p.id, defaultHeartbeat, defaultElection)
  119. info := p.node.Info()
  120. if err = w.SaveInfo(&info); err != nil {
  121. return nil, err
  122. }
  123. log.Printf("id=%x participant.new path=%s\n", p.id, walDir)
  124. } else {
  125. s, err := p.snapshotter.Load()
  126. if err != nil && err != snap.ErrNoSnapshot {
  127. log.Printf("id=%x participant.snapload err=%s\n", p.id, err)
  128. return nil, err
  129. }
  130. var snapIndex int64
  131. if s != nil {
  132. if err := p.Recovery(s.Data); err != nil {
  133. panic(err)
  134. }
  135. log.Printf("id=%x participant.store.recovered index=%d\n", p.id, s.Index)
  136. snapIndex = s.Index
  137. }
  138. n, err := wal.Read(walDir, snapIndex)
  139. if err != nil {
  140. return nil, err
  141. }
  142. p.id = n.Id
  143. p.node.Node = raft.Recover(n.Id, s, n.Ents, n.State, defaultHeartbeat, defaultElection)
  144. p.apply(p.node.Next())
  145. log.Printf("id=%x participant.load path=%s snapIndex=%d state=\"%+v\" len(ents)=%d", p.id, p.cfg.DataDir, snapIndex, n.State, len(n.Ents))
  146. if w, err = wal.Open(walDir); err != nil {
  147. return nil, err
  148. }
  149. }
  150. p.w = w
  151. p.Handle(v2Prefix+"/", handlerErr(p.serveValue))
  152. p.Handle(v2machinePrefix, handlerErr(p.serveMachines))
  153. p.Handle(v2peersPrefix, handlerErr(p.serveMachines))
  154. p.Handle(v2LeaderPrefix, handlerErr(p.serveLeader))
  155. p.Handle(v2SelfStatsPrefix, handlerErr(p.serveSelfStats))
  156. p.Handle(v2LeaderStatsPrefix, handlerErr(p.serveLeaderStats))
  157. p.Handle(v2StoreStatsPrefix, handlerErr(p.serveStoreStats))
  158. p.rh.Handle(v2adminConfigPrefix, handlerErr(p.serveAdminConfig))
  159. p.rh.Handle(v2adminMachinesPrefix, handlerErr(p.serveAdminMachines))
  160. // TODO: remind to set application/json for /v2/stats endpoint
  161. return p, nil
  162. }
  163. func (p *participant) run(stop chan struct{}) {
  164. defer p.cleanup()
  165. if p.node.IsEmpty() {
  166. seeds := p.peerHub.getSeeds()
  167. if len(seeds) == 0 {
  168. log.Printf("id=%x participant.run action=bootstrap\n", p.id)
  169. p.node.Campaign()
  170. p.node.InitCluster(genId())
  171. p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr))
  172. p.apply(p.node.Next())
  173. } else {
  174. log.Printf("id=%x participant.run action=join seeds=\"%v\"\n", p.id, seeds)
  175. if err := p.join(); err != nil {
  176. log.Fatalf("id=%x participant.join err=%q", p.id, err)
  177. }
  178. }
  179. }
  180. p.rh.start()
  181. defer p.rh.stop()
  182. node := p.node
  183. recv := p.rh.recv
  184. ticker := time.NewTicker(p.tickDuration)
  185. defer ticker.Stop()
  186. v2SyncTicker := time.NewTicker(time.Millisecond * 500)
  187. defer v2SyncTicker.Stop()
  188. var proposal chan v2Proposal
  189. var addNodeC, removeNodeC chan raft.Config
  190. for {
  191. if node.HasLeader() {
  192. proposal = p.proposal
  193. addNodeC = p.addNodeC
  194. removeNodeC = p.removeNodeC
  195. } else {
  196. proposal = nil
  197. addNodeC = nil
  198. removeNodeC = nil
  199. }
  200. select {
  201. case p := <-proposal:
  202. node.Propose(p)
  203. case c := <-addNodeC:
  204. node.UpdateConf(raft.AddNode, &c)
  205. case c := <-removeNodeC:
  206. node.UpdateConf(raft.RemoveNode, &c)
  207. case msg := <-recv:
  208. node.Step(*msg)
  209. case <-ticker.C:
  210. node.Tick()
  211. case <-v2SyncTicker.C:
  212. node.Sync()
  213. case <-stop:
  214. log.Printf("id=%x participant.stop\n", p.id)
  215. return
  216. }
  217. if s := node.UnstableSnapshot(); !s.IsEmpty() {
  218. if err := p.Recovery(s.Data); err != nil {
  219. panic(err)
  220. }
  221. log.Printf("id=%x recovered index=%d\n", p.id, s.Index)
  222. }
  223. p.apply(node.Next())
  224. ents := node.UnstableEnts()
  225. p.save(ents, node.UnstableState())
  226. p.send(node.Msgs())
  227. if node.IsRemoved() {
  228. log.Printf("id=%x participant.end\n", p.id)
  229. return
  230. }
  231. if p.node.EntsLen() > defaultCompact {
  232. d, err := p.Save()
  233. if err != nil {
  234. panic(err)
  235. }
  236. p.node.Compact(d)
  237. snap := p.node.GetSnap()
  238. log.Printf("id=%x compacted index=%d", p.id, snap.Index)
  239. if err := p.snapshotter.Save(&snap); err != nil {
  240. log.Printf("id=%d snapshot err=%v", p.id, err)
  241. // todo(xiangli): consume the error?
  242. panic(err)
  243. }
  244. if err := p.w.Cut(p.node.Index()); err != nil {
  245. log.Printf("id=%d wal.cut err=%v", p.id, err)
  246. // todo(xiangli): consume the error?
  247. panic(err)
  248. }
  249. info := p.node.Info()
  250. if err = p.w.SaveInfo(&info); err != nil {
  251. log.Printf("id=%d wal.saveInfo err=%v", p.id, err)
  252. // todo(xiangli): consume the error?
  253. panic(err)
  254. }
  255. }
  256. }
  257. }
  258. func (p *participant) cleanup() {
  259. p.w.Close()
  260. close(p.stopNotifyc)
  261. p.peerHub.stop()
  262. }
  263. func (p *participant) raftHandler() http.Handler {
  264. return p.rh
  265. }
  266. func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
  267. log.Printf("id=%x participant.add nodeId=%x raftPubAddr=%s pubAddr=%s\n", p.id, id, raftPubAddr, pubAddr)
  268. pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
  269. _, err := p.Store.Get(pp, false, false)
  270. if err == nil {
  271. return nil
  272. }
  273. if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound {
  274. log.Printf("id=%x participant.add getErr=\"%v\"\n", p.id, err)
  275. return err
  276. }
  277. w, err := p.Watch(pp, true, false, 0)
  278. if err != nil {
  279. log.Printf("id=%x participant.add watchErr=\"%v\"\n", p.id, err)
  280. return tmpErr
  281. }
  282. select {
  283. case p.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}:
  284. default:
  285. w.Remove()
  286. log.Printf("id=%x participant.add proposeErr=\"unable to send out addNode proposal\"\n", p.id)
  287. return tmpErr
  288. }
  289. select {
  290. case v := <-w.EventChan:
  291. if v.Action == store.Set {
  292. return nil
  293. }
  294. log.Printf("id=%x participant.add watchErr=\"unexpected action\" action=%s\n", p.id, v.Action)
  295. return tmpErr
  296. case <-time.After(6 * defaultHeartbeat * p.tickDuration):
  297. w.Remove()
  298. log.Printf("id=%x participant.add watchErr=timeout\n", p.id)
  299. return tmpErr
  300. case <-p.stopNotifyc:
  301. return stopErr
  302. }
  303. }
  304. func (p *participant) remove(id int64) error {
  305. log.Printf("id=%x participant.remove nodeId=%x\n", p.id, id)
  306. pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
  307. v, err := p.Store.Get(pp, false, false)
  308. if err != nil {
  309. return nil
  310. }
  311. select {
  312. case p.removeNodeC <- raft.Config{NodeId: id}:
  313. default:
  314. log.Printf("id=%x participant.remove proposeErr=\"unable to send out removeNode proposal\"\n", p.id)
  315. return tmpErr
  316. }
  317. // TODO(xiangli): do not need to watch if the
  318. // removal target is self
  319. w, err := p.Watch(pp, true, false, v.Index()+1)
  320. if err != nil {
  321. log.Printf("id=%x participant.remove watchErr=\"%v\"\n", p.id, err)
  322. return tmpErr
  323. }
  324. select {
  325. case v := <-w.EventChan:
  326. if v.Action == store.Delete {
  327. return nil
  328. }
  329. log.Printf("id=%x participant.remove watchErr=\"unexpected action\" action=%s\n", p.id, v.Action)
  330. return tmpErr
  331. case <-time.After(6 * defaultHeartbeat * p.tickDuration):
  332. w.Remove()
  333. log.Printf("id=%x participant.remove watchErr=timeout\n", p.id)
  334. return tmpErr
  335. case <-p.stopNotifyc:
  336. return stopErr
  337. }
  338. }
  339. func (p *participant) apply(ents []raft.Entry) {
  340. offset := p.node.Applied() - int64(len(ents)) + 1
  341. for i, ent := range ents {
  342. switch ent.Type {
  343. // expose raft entry type
  344. case raft.Normal:
  345. if len(ent.Data) == 0 {
  346. continue
  347. }
  348. p.v2apply(offset+int64(i), ent)
  349. case raft.ClusterInit:
  350. p.clusterId = p.node.ClusterId()
  351. log.Printf("id=%x participant.cluster.setId clusterId=%x\n", p.id, p.clusterId)
  352. case raft.AddNode:
  353. cfg := new(raft.Config)
  354. if err := json.Unmarshal(ent.Data, cfg); err != nil {
  355. log.Printf("id=%x participant.cluster.addNode unmarshalErr=\"%v\"\n", p.id, err)
  356. break
  357. }
  358. pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
  359. if ev, _ := p.Store.Get(pp, false, false); ev != nil {
  360. log.Printf("id=%x participant.cluster.addNode err=existed value=%q", p.id, *ev.Node.Value)
  361. break
  362. }
  363. peer, err := p.peerHub.add(cfg.NodeId, cfg.Addr)
  364. if err != nil {
  365. log.Printf("id=%x participant.cluster.addNode peerAddErr=\"%v\"\n", p.id, err)
  366. break
  367. }
  368. peer.participate()
  369. p.Store.Set(pp, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent)
  370. if p.id == cfg.NodeId {
  371. p.raftPubAddr = cfg.Addr
  372. p.pubAddr = string(cfg.Context)
  373. }
  374. log.Printf("id=%x participant.cluster.addNode nodeId=%x addr=%s context=%s\n", p.id, cfg.NodeId, cfg.Addr, cfg.Context)
  375. case raft.RemoveNode:
  376. cfg := new(raft.Config)
  377. if err := json.Unmarshal(ent.Data, cfg); err != nil {
  378. log.Printf("id=%x participant.cluster.removeNode unmarshalErr=\"%v\"\n", p.id, err)
  379. break
  380. }
  381. peer, err := p.peerHub.peer(cfg.NodeId)
  382. if err != nil {
  383. log.Fatal("id=%x participant.apply getPeerErr=\"%v\"", p.id, err)
  384. }
  385. peer.idle()
  386. pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
  387. p.Store.Delete(pp, false, false)
  388. log.Printf("id=%x participant.cluster.removeNode nodeId=%x\n", p.id, cfg.NodeId)
  389. default:
  390. panic("unimplemented")
  391. }
  392. }
  393. }
  394. func (p *participant) save(ents []raft.Entry, state raft.State) {
  395. for _, ent := range ents {
  396. if err := p.w.SaveEntry(&ent); err != nil {
  397. log.Panicf("id=%x participant.save saveEntryErr=%q", p.id, err)
  398. }
  399. }
  400. if !state.IsEmpty() {
  401. if err := p.w.SaveState(&state); err != nil {
  402. log.Panicf("id=%x participant.save saveStateErr=%q", p.id, err)
  403. }
  404. }
  405. if err := p.w.Sync(); err != nil {
  406. log.Panicf("id=%x participant.save syncErr=%q", p.id, err)
  407. }
  408. }
  409. func (p *participant) send(msgs []raft.Message) {
  410. for i := range msgs {
  411. if err := p.peerHub.send(msgs[i]); err != nil {
  412. log.Printf("id=%x participant.send err=\"%v\"\n", p.id, err)
  413. }
  414. }
  415. }
  416. func (p *participant) join() error {
  417. info := &context{
  418. MinVersion: store.MinVersion(),
  419. MaxVersion: store.MaxVersion(),
  420. ClientURL: p.pubAddr,
  421. PeerURL: p.raftPubAddr,
  422. }
  423. max := p.cfg.MaxRetryAttempts
  424. for attempt := 0; ; attempt++ {
  425. for seed := range p.peerHub.getSeeds() {
  426. if err := p.client.AddMachine(seed, fmt.Sprint(p.id), info); err == nil {
  427. return nil
  428. } else {
  429. log.Printf("id=%x participant.join addMachineErr=\"%v\"\n", p.id, err)
  430. }
  431. }
  432. if attempt == max {
  433. return fmt.Errorf("etcd: cannot join cluster after %d attempts", max)
  434. }
  435. time.Sleep(time.Millisecond * time.Duration(p.cfg.RetryInterval*1000))
  436. }
  437. }
  438. func genId() int64 {
  439. r := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
  440. return r.Int63()
  441. }