participant.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "encoding/json"
  16. "fmt"
  17. "log"
  18. "math/rand"
  19. "net/http"
  20. "net/url"
  21. "os"
  22. "path"
  23. "time"
  24. "github.com/coreos/etcd/conf"
  25. etcdErr "github.com/coreos/etcd/error"
  26. "github.com/coreos/etcd/raft"
  27. "github.com/coreos/etcd/snap"
  28. "github.com/coreos/etcd/store"
  29. "github.com/coreos/etcd/wal"
  30. )
  31. const (
  32. defaultHeartbeat = 1
  33. defaultElection = 5
  34. maxBufferedProposal = 128
  35. defaultTickDuration = time.Millisecond * 100
  36. v2machineKVPrefix = "/_etcd/machines"
  37. v2configKVPrefix = "/_etcd/config"
  38. v2Prefix = "/v2/keys"
  39. v2machinePrefix = "/v2/machines"
  40. v2peersPrefix = "/v2/peers"
  41. v2LeaderPrefix = "/v2/leader"
  42. v2SelfStatsPrefix = "/v2/stats/self"
  43. v2LeaderStatsPrefix = "/v2/stats/leader"
  44. v2StoreStatsPrefix = "/v2/stats/store"
  45. v2adminConfigPrefix = "/v2/admin/config"
  46. v2adminMachinesPrefix = "/v2/admin/machines/"
  47. )
  48. var (
  49. defaultCompact = 10000
  50. tmpErr = fmt.Errorf("try again")
  51. stopErr = fmt.Errorf("server is stopped")
  52. raftStopErr = fmt.Errorf("raft is stopped")
  53. )
  54. type participant struct {
  55. id int64
  56. clusterId int64
  57. cfg *conf.Config
  58. pubAddr string
  59. raftPubAddr string
  60. tickDuration time.Duration
  61. client *v2client
  62. peerHub *peerHub
  63. proposal chan v2Proposal
  64. addNodeC chan raft.Config
  65. removeNodeC chan raft.Config
  66. node *v2Raft
  67. store.Store
  68. rh *raftHandler
  69. w *wal.WAL
  70. snapshotter *snap.Snapshotter
  71. serverStats *raftServerStats
  72. stopNotifyc chan struct{}
  73. *http.ServeMux
  74. }
  75. func newParticipant(c *conf.Config, client *v2client, peerHub *peerHub, tickDuration time.Duration) (*participant, error) {
  76. p := &participant{
  77. clusterId: -1,
  78. cfg: c,
  79. tickDuration: tickDuration,
  80. client: client,
  81. peerHub: peerHub,
  82. proposal: make(chan v2Proposal, maxBufferedProposal),
  83. addNodeC: make(chan raft.Config, 1),
  84. removeNodeC: make(chan raft.Config, 1),
  85. node: &v2Raft{
  86. result: make(map[wait]chan interface{}),
  87. },
  88. Store: store.New(),
  89. serverStats: NewRaftServerStats(c.Name),
  90. stopNotifyc: make(chan struct{}),
  91. ServeMux: http.NewServeMux(),
  92. }
  93. p.rh = newRaftHandler(peerHub, p.Store.Version(), p.serverStats)
  94. p.peerHub.setServerStats(p.serverStats)
  95. snapDir := path.Join(p.cfg.DataDir, "snap")
  96. if err := os.Mkdir(snapDir, 0700); err != nil {
  97. if !os.IsExist(err) {
  98. log.Printf("id=%x participant.new.mkdir err=%v", p.id, err)
  99. return nil, err
  100. }
  101. }
  102. p.snapshotter = snap.New(snapDir)
  103. walDir := path.Join(p.cfg.DataDir, "wal")
  104. if err := os.Mkdir(walDir, 0700); err != nil {
  105. if !os.IsExist(err) {
  106. log.Printf("id=%x participant.new.mkdir err=%v", p.id, err)
  107. return nil, err
  108. }
  109. }
  110. var w *wal.WAL
  111. var err error
  112. if !wal.Exist(walDir) {
  113. p.id = genId()
  114. p.pubAddr = c.Addr
  115. p.raftPubAddr = c.Peer.Addr
  116. if w, err = wal.Create(walDir); err != nil {
  117. return nil, err
  118. }
  119. p.node.Node = raft.New(p.id, defaultHeartbeat, defaultElection)
  120. info := p.node.Info()
  121. if err = w.SaveInfo(&info); err != nil {
  122. return nil, err
  123. }
  124. log.Printf("id=%x participant.new path=%s\n", p.id, walDir)
  125. } else {
  126. s, err := p.snapshotter.Load()
  127. if err != nil && err != snap.ErrNoSnapshot {
  128. log.Printf("participant.snapload err=%s\n", err)
  129. return nil, err
  130. }
  131. var snapIndex int64
  132. if s != nil {
  133. if err := p.Recovery(s.Data); err != nil {
  134. log.Printf("store.recover err=%v", err)
  135. return nil, err
  136. }
  137. log.Printf("participant.store.recovered index=%d\n", s.Index)
  138. for _, node := range s.Nodes {
  139. pp := path.Join(v2machineKVPrefix, fmt.Sprint(node))
  140. ev, err := p.Store.Get(pp, false, false)
  141. if err != nil {
  142. log.Printf("store.get err=%v", err)
  143. return nil, err
  144. }
  145. q, err := url.ParseQuery(*ev.Node.Value)
  146. if err != nil {
  147. log.Printf("url.parse err=%v", err)
  148. return nil, err
  149. }
  150. peer, err := p.peerHub.add(node, q["raft"][0])
  151. if err != nil {
  152. log.Printf("peerHub.add err=%v", err)
  153. return nil, err
  154. }
  155. peer.participate()
  156. }
  157. snapIndex = s.Index
  158. }
  159. n, err := wal.Read(walDir, snapIndex)
  160. if err != nil {
  161. return nil, err
  162. }
  163. p.id = n.Id
  164. p.node.Node = raft.Recover(n.Id, s, n.Ents, n.State, defaultHeartbeat, defaultElection)
  165. p.apply(p.node.Next())
  166. log.Printf("id=%x participant.load path=%s snapIndex=%d state=\"%+v\" len(ents)=%d", p.id, p.cfg.DataDir, snapIndex, n.State, len(n.Ents))
  167. if w, err = wal.Open(walDir); err != nil {
  168. return nil, err
  169. }
  170. }
  171. p.w = w
  172. p.Handle(v2Prefix+"/", handlerErr(p.serveValue))
  173. p.Handle(v2machinePrefix, handlerErr(p.serveMachines))
  174. p.Handle(v2peersPrefix, handlerErr(p.serveMachines))
  175. p.Handle(v2LeaderPrefix, handlerErr(p.serveLeader))
  176. p.Handle(v2SelfStatsPrefix, handlerErr(p.serveSelfStats))
  177. p.Handle(v2LeaderStatsPrefix, handlerErr(p.serveLeaderStats))
  178. p.Handle(v2StoreStatsPrefix, handlerErr(p.serveStoreStats))
  179. p.rh.Handle(v2adminConfigPrefix, handlerErr(p.serveAdminConfig))
  180. p.rh.Handle(v2adminMachinesPrefix, handlerErr(p.serveAdminMachines))
  181. // TODO: remind to set application/json for /v2/stats endpoint
  182. return p, nil
  183. }
  184. func (p *participant) run(stop chan struct{}) error {
  185. defer p.cleanup()
  186. if p.node.IsEmpty() {
  187. seeds := p.peerHub.getSeeds()
  188. if len(seeds) == 0 {
  189. log.Printf("id=%x participant.run action=bootstrap\n", p.id)
  190. p.node.Campaign()
  191. p.node.InitCluster(genId())
  192. p.node.Add(p.id, p.raftPubAddr, []byte(p.pubAddr))
  193. p.apply(p.node.Next())
  194. } else {
  195. log.Printf("id=%x participant.run action=join seeds=\"%v\"\n", p.id, seeds)
  196. if err := p.join(); err != nil {
  197. log.Printf("id=%x participant.join err=%q", p.id, err)
  198. return err
  199. }
  200. }
  201. }
  202. p.rh.start()
  203. defer p.rh.stop()
  204. node := p.node
  205. recv := p.rh.recv
  206. ticker := time.NewTicker(p.tickDuration)
  207. defer ticker.Stop()
  208. v2SyncTicker := time.NewTicker(time.Millisecond * 500)
  209. defer v2SyncTicker.Stop()
  210. var proposal chan v2Proposal
  211. var addNodeC, removeNodeC chan raft.Config
  212. for {
  213. if node.HasLeader() {
  214. proposal = p.proposal
  215. addNodeC = p.addNodeC
  216. removeNodeC = p.removeNodeC
  217. } else {
  218. proposal = nil
  219. addNodeC = nil
  220. removeNodeC = nil
  221. }
  222. select {
  223. case p := <-proposal:
  224. node.Propose(p)
  225. case c := <-addNodeC:
  226. node.UpdateConf(raft.AddNode, &c)
  227. case c := <-removeNodeC:
  228. node.UpdateConf(raft.RemoveNode, &c)
  229. case msg := <-recv:
  230. node.Step(*msg)
  231. case <-ticker.C:
  232. node.Tick()
  233. case <-v2SyncTicker.C:
  234. node.Sync()
  235. case <-stop:
  236. log.Printf("id=%x participant.stop\n", p.id)
  237. return nil
  238. }
  239. if s := node.UnstableSnapshot(); !s.IsEmpty() {
  240. if err := p.Recovery(s.Data); err != nil {
  241. log.Printf("id=%x participant.recover err=%q", p.id, err)
  242. return err
  243. }
  244. log.Printf("id=%x participant.recovered index=%d", p.id, s.Index)
  245. }
  246. p.apply(node.Next())
  247. if err := p.save(node.UnstableEnts(), node.UnstableState()); err != nil {
  248. return err
  249. }
  250. p.send(node.Msgs())
  251. if node.IsRemoved() {
  252. log.Printf("id=%x participant.end\n", p.id)
  253. return nil
  254. }
  255. if p.node.EntsLen() > defaultCompact {
  256. d, err := p.Save()
  257. if err != nil {
  258. log.Printf("id=%x participant.compact err=%q", p.id, err)
  259. return err
  260. }
  261. p.node.Compact(d)
  262. snap := p.node.GetSnap()
  263. log.Printf("id=%x compacted index=%d", p.id, snap.Index)
  264. if err := p.snapshotter.Save(&snap); err != nil {
  265. log.Printf("id=%x snapshot.save err=%v", p.id, err)
  266. return err
  267. }
  268. if err := p.w.Cut(p.node.Index()); err != nil {
  269. log.Printf("id=%x wal.cut err=%v", p.id, err)
  270. return err
  271. }
  272. info := p.node.Info()
  273. if err = p.w.SaveInfo(&info); err != nil {
  274. log.Printf("id=%x wal.saveInfo err=%v", p.id, err)
  275. return err
  276. }
  277. }
  278. }
  279. }
  280. func (p *participant) cleanup() {
  281. p.w.Close()
  282. close(p.stopNotifyc)
  283. p.peerHub.stop()
  284. }
  285. func (p *participant) raftHandler() http.Handler {
  286. return p.rh
  287. }
  288. func (p *participant) add(id int64, raftPubAddr string, pubAddr string) error {
  289. log.Printf("id=%x participant.add nodeId=%x raftPubAddr=%s pubAddr=%s\n", p.id, id, raftPubAddr, pubAddr)
  290. pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
  291. _, err := p.Store.Get(pp, false, false)
  292. if err == nil {
  293. return nil
  294. }
  295. if v, ok := err.(*etcdErr.Error); !ok || v.ErrorCode != etcdErr.EcodeKeyNotFound {
  296. log.Printf("id=%x participant.add getErr=\"%v\"\n", p.id, err)
  297. return err
  298. }
  299. w, err := p.Watch(pp, true, false, 0)
  300. if err != nil {
  301. log.Printf("id=%x participant.add watchErr=\"%v\"\n", p.id, err)
  302. return tmpErr
  303. }
  304. select {
  305. case p.addNodeC <- raft.Config{NodeId: id, Addr: raftPubAddr, Context: []byte(pubAddr)}:
  306. default:
  307. w.Remove()
  308. log.Printf("id=%x participant.add proposeErr=\"unable to send out addNode proposal\"\n", p.id)
  309. return tmpErr
  310. }
  311. select {
  312. case v := <-w.EventChan:
  313. if v.Action == store.Set {
  314. return nil
  315. }
  316. log.Printf("id=%x participant.add watchErr=\"unexpected action\" action=%s\n", p.id, v.Action)
  317. return tmpErr
  318. case <-time.After(6 * defaultHeartbeat * p.tickDuration):
  319. w.Remove()
  320. log.Printf("id=%x participant.add watchErr=timeout\n", p.id)
  321. return tmpErr
  322. case <-p.stopNotifyc:
  323. return stopErr
  324. }
  325. }
  326. func (p *participant) remove(id int64) error {
  327. log.Printf("id=%x participant.remove nodeId=%x\n", p.id, id)
  328. pp := path.Join(v2machineKVPrefix, fmt.Sprint(id))
  329. v, err := p.Store.Get(pp, false, false)
  330. if err != nil {
  331. return nil
  332. }
  333. select {
  334. case p.removeNodeC <- raft.Config{NodeId: id}:
  335. default:
  336. log.Printf("id=%x participant.remove proposeErr=\"unable to send out removeNode proposal\"\n", p.id)
  337. return tmpErr
  338. }
  339. // TODO(xiangli): do not need to watch if the
  340. // removal target is self
  341. w, err := p.Watch(pp, true, false, v.Index()+1)
  342. if err != nil {
  343. log.Printf("id=%x participant.remove watchErr=\"%v\"\n", p.id, err)
  344. return tmpErr
  345. }
  346. select {
  347. case v := <-w.EventChan:
  348. if v.Action == store.Delete {
  349. return nil
  350. }
  351. log.Printf("id=%x participant.remove watchErr=\"unexpected action\" action=%s\n", p.id, v.Action)
  352. return tmpErr
  353. case <-time.After(6 * defaultHeartbeat * p.tickDuration):
  354. w.Remove()
  355. log.Printf("id=%x participant.remove watchErr=timeout\n", p.id)
  356. return tmpErr
  357. case <-p.stopNotifyc:
  358. return stopErr
  359. }
  360. }
  361. func (p *participant) apply(ents []raft.Entry) {
  362. offset := p.node.Applied() - int64(len(ents)) + 1
  363. for i, ent := range ents {
  364. switch ent.Type {
  365. // expose raft entry type
  366. case raft.Normal:
  367. if len(ent.Data) == 0 {
  368. continue
  369. }
  370. p.v2apply(offset+int64(i), ent)
  371. case raft.ClusterInit:
  372. p.clusterId = p.node.ClusterId()
  373. log.Printf("id=%x participant.cluster.setId clusterId=%x\n", p.id, p.clusterId)
  374. case raft.AddNode:
  375. cfg := new(raft.Config)
  376. if err := json.Unmarshal(ent.Data, cfg); err != nil {
  377. log.Printf("id=%x participant.cluster.addNode unmarshalErr=\"%v\"\n", p.id, err)
  378. break
  379. }
  380. pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
  381. if ev, _ := p.Store.Get(pp, false, false); ev != nil {
  382. log.Printf("id=%x participant.cluster.addNode err=existed value=%q", p.id, *ev.Node.Value)
  383. break
  384. }
  385. peer, err := p.peerHub.add(cfg.NodeId, cfg.Addr)
  386. if err != nil {
  387. log.Printf("id=%x participant.cluster.addNode peerAddErr=\"%v\"\n", p.id, err)
  388. break
  389. }
  390. peer.participate()
  391. p.Store.Set(pp, false, fmt.Sprintf("raft=%v&etcd=%v", cfg.Addr, string(cfg.Context)), store.Permanent)
  392. if p.id == cfg.NodeId {
  393. p.raftPubAddr = cfg.Addr
  394. p.pubAddr = string(cfg.Context)
  395. }
  396. log.Printf("id=%x participant.cluster.addNode nodeId=%x addr=%s context=%s\n", p.id, cfg.NodeId, cfg.Addr, cfg.Context)
  397. case raft.RemoveNode:
  398. cfg := new(raft.Config)
  399. if err := json.Unmarshal(ent.Data, cfg); err != nil {
  400. log.Printf("id=%x participant.cluster.removeNode unmarshalErr=\"%v\"\n", p.id, err)
  401. break
  402. }
  403. peer, err := p.peerHub.peer(cfg.NodeId)
  404. if err != nil {
  405. log.Fatal("id=%x participant.apply getPeerErr=\"%v\"", p.id, err)
  406. }
  407. peer.idle()
  408. pp := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
  409. p.Store.Delete(pp, false, false)
  410. log.Printf("id=%x participant.cluster.removeNode nodeId=%x\n", p.id, cfg.NodeId)
  411. default:
  412. panic("unimplemented")
  413. }
  414. }
  415. }
  416. func (p *participant) save(ents []raft.Entry, state raft.State) error {
  417. for _, ent := range ents {
  418. if err := p.w.SaveEntry(&ent); err != nil {
  419. log.Printf("id=%x participant.save saveEntryErr=%q", p.id, err)
  420. return err
  421. }
  422. }
  423. if !state.IsEmpty() {
  424. if err := p.w.SaveState(&state); err != nil {
  425. log.Printf("id=%x participant.save saveStateErr=%q", p.id, err)
  426. return err
  427. }
  428. }
  429. if err := p.w.Sync(); err != nil {
  430. log.Printf("id=%x participant.save syncErr=%q", p.id, err)
  431. return err
  432. }
  433. return nil
  434. }
  435. func (p *participant) send(msgs []raft.Message) {
  436. for i := range msgs {
  437. if err := p.peerHub.send(msgs[i]); err != nil {
  438. log.Printf("id=%x participant.send err=\"%v\"\n", p.id, err)
  439. }
  440. }
  441. }
  442. func (p *participant) join() error {
  443. info := &context{
  444. MinVersion: store.MinVersion(),
  445. MaxVersion: store.MaxVersion(),
  446. ClientURL: p.pubAddr,
  447. PeerURL: p.raftPubAddr,
  448. }
  449. max := p.cfg.MaxRetryAttempts
  450. for attempt := 0; ; attempt++ {
  451. for seed := range p.peerHub.getSeeds() {
  452. if err := p.client.AddMachine(seed, fmt.Sprint(p.id), info); err == nil {
  453. return nil
  454. } else {
  455. log.Printf("id=%x participant.join addMachineErr=\"%v\"\n", p.id, err)
  456. }
  457. }
  458. if attempt == max {
  459. return fmt.Errorf("etcd: cannot join cluster after %d attempts", max)
  460. }
  461. time.Sleep(time.Millisecond * time.Duration(p.cfg.RetryInterval*1000))
  462. }
  463. }
  464. func genId() int64 {
  465. r := rand.New(rand.NewSource(int64(time.Now().Nanosecond())))
  466. return r.Int63()
  467. }