etcd.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package etcd
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. "path"
  8. "time"
  9. "github.com/coreos/etcd/config"
  10. "github.com/coreos/etcd/raft"
  11. "github.com/coreos/etcd/store"
  12. )
  13. const (
  14. defaultHeartbeat = 1
  15. defaultElection = 5
  16. defaultTickDuration = time.Millisecond * 100
  17. v2machineKVPrefix = "/_etcd/machines"
  18. v2Prefix = "/v2/keys"
  19. v2machinePrefix = "/v2/machines"
  20. v2LeaderPrefix = "/v2/leader"
  21. raftPrefix = "/raft"
  22. )
  23. type Server struct {
  24. config *config.Config
  25. id int
  26. pubAddr string
  27. nodes map[string]bool
  28. tickDuration time.Duration
  29. proposal chan v2Proposal
  30. node *v2Raft
  31. t *transporter
  32. store.Store
  33. stop chan struct{}
  34. http.Handler
  35. }
  36. func New(c *config.Config, id int) *Server {
  37. if err := c.Sanitize(); err != nil {
  38. log.Fatalf("failed sanitizing configuration: %v", err)
  39. }
  40. s := &Server{
  41. config: c,
  42. id: id,
  43. pubAddr: c.Addr,
  44. nodes: make(map[string]bool),
  45. tickDuration: defaultTickDuration,
  46. proposal: make(chan v2Proposal),
  47. node: &v2Raft{
  48. Node: raft.New(id, defaultHeartbeat, defaultElection),
  49. result: make(map[wait]chan interface{}),
  50. },
  51. t: newTransporter(),
  52. Store: store.New(),
  53. stop: make(chan struct{}),
  54. }
  55. for _, seed := range c.Peers {
  56. s.nodes[seed] = true
  57. }
  58. m := http.NewServeMux()
  59. //m.Handle("/HEAD", handlerErr(s.serveHead))
  60. m.Handle(v2Prefix+"/", handlerErr(s.serveValue))
  61. m.Handle("/raft", s.t)
  62. m.Handle(v2machinePrefix, handlerErr(s.serveMachines))
  63. m.Handle(v2LeaderPrefix, handlerErr(s.serveLeader))
  64. s.Handler = m
  65. return s
  66. }
  67. func (s *Server) SetTick(d time.Duration) {
  68. s.tickDuration = d
  69. }
  70. func (s *Server) Run() {
  71. if len(s.config.Peers) == 0 {
  72. s.Bootstrap()
  73. } else {
  74. s.Join()
  75. }
  76. }
  77. func (s *Server) Stop() {
  78. close(s.stop)
  79. s.t.stop()
  80. }
  81. func (s *Server) Bootstrap() {
  82. log.Println("starting a bootstrap node")
  83. s.node.Campaign()
  84. s.node.Add(s.id, s.pubAddr)
  85. s.apply(s.node.Next())
  86. s.run()
  87. }
  88. func (s *Server) Join() {
  89. log.Println("joining cluster via peers", s.config.Peers)
  90. d, err := json.Marshal(&raft.Config{s.id, s.pubAddr})
  91. if err != nil {
  92. panic(err)
  93. }
  94. b, err := json.Marshal(&raft.Message{From: s.id, Type: 2, Entries: []raft.Entry{{Type: 1, Data: d}}})
  95. if err != nil {
  96. panic(err)
  97. }
  98. for seed := range s.nodes {
  99. if err := s.t.send(seed+raftPrefix, b); err != nil {
  100. log.Println(err)
  101. continue
  102. }
  103. // todo(xiangli) WAIT for join to be committed or retry...
  104. break
  105. }
  106. s.run()
  107. }
  108. func (s *Server) run() {
  109. node := s.node
  110. recv := s.t.recv
  111. ticker := time.NewTicker(s.tickDuration)
  112. v2SyncTicker := time.NewTicker(time.Millisecond * 500)
  113. var proposal chan v2Proposal
  114. for {
  115. if node.HasLeader() {
  116. proposal = s.proposal
  117. } else {
  118. proposal = nil
  119. }
  120. select {
  121. case p := <-proposal:
  122. node.Propose(p)
  123. case msg := <-recv:
  124. node.Step(*msg)
  125. case <-ticker.C:
  126. node.Tick()
  127. case <-v2SyncTicker.C:
  128. node.Sync()
  129. case <-s.stop:
  130. log.Printf("Node: %d stopped\n", s.id)
  131. return
  132. }
  133. s.apply(node.Next())
  134. s.send(node.Msgs())
  135. }
  136. }
  137. func (s *Server) apply(ents []raft.Entry) {
  138. offset := s.node.Applied() - len(ents) + 1
  139. for i, ent := range ents {
  140. switch ent.Type {
  141. // expose raft entry type
  142. case raft.Normal:
  143. if len(ent.Data) == 0 {
  144. continue
  145. }
  146. s.v2apply(offset+i, ent)
  147. case raft.AddNode:
  148. cfg := new(raft.Config)
  149. if err := json.Unmarshal(ent.Data, cfg); err != nil {
  150. log.Println(err)
  151. break
  152. }
  153. if err := s.t.set(cfg.NodeId, cfg.Addr); err != nil {
  154. log.Println(err)
  155. break
  156. }
  157. log.Printf("Add Node %x %v\n", cfg.NodeId, cfg.Addr)
  158. s.nodes[cfg.Addr] = true
  159. p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
  160. s.Store.Set(p, false, cfg.Addr, store.Permanent)
  161. default:
  162. panic("unimplemented")
  163. }
  164. }
  165. }
  166. func (s *Server) send(msgs []raft.Message) {
  167. for i := range msgs {
  168. data, err := json.Marshal(msgs[i])
  169. if err != nil {
  170. // todo(xiangli): error handling
  171. log.Fatal(err)
  172. }
  173. // todo(xiangli): reuse routines and limit the number of sending routines
  174. // sync.Pool?
  175. go func(i int) {
  176. var err error
  177. if err = s.t.sendTo(msgs[i].To, data); err == nil {
  178. return
  179. }
  180. if err == errUnknownNode {
  181. err = s.fetchAddr(msgs[i].To)
  182. }
  183. if err == nil {
  184. err = s.t.sendTo(msgs[i].To, data)
  185. }
  186. if err != nil {
  187. log.Println(err)
  188. }
  189. }(i)
  190. }
  191. }
  192. func (s *Server) fetchAddr(nodeId int) error {
  193. for seed := range s.nodes {
  194. if err := s.t.fetchAddr(seed, nodeId); err == nil {
  195. return nil
  196. }
  197. }
  198. return fmt.Errorf("cannot fetch the address of node %d", nodeId)
  199. }