etcd.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package etcd
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "log"
  6. "net/http"
  7. "path"
  8. "time"
  9. "github.com/coreos/etcd/config"
  10. "github.com/coreos/etcd/raft"
  11. "github.com/coreos/etcd/store"
  12. )
  13. const (
  14. defaultHeartbeat = 1
  15. defaultElection = 5
  16. defaultTickDuration = time.Millisecond * 100
  17. v2machineKVPrefix = "/_etcd/machines"
  18. v2Prefix = "/v2/keys"
  19. raftPrefix = "/raft"
  20. )
  21. type Server struct {
  22. config *config.Config
  23. id int
  24. pubAddr string
  25. nodes map[string]bool
  26. tickDuration time.Duration
  27. proposal chan v2Proposal
  28. node *v2Raft
  29. t *transporter
  30. store.Store
  31. stop chan struct{}
  32. http.Handler
  33. }
  34. func New(c *config.Config, id int) *Server {
  35. if err := c.Sanitize(); err != nil {
  36. log.Fatalf("failed sanitizing configuration: %v", err)
  37. }
  38. s := &Server{
  39. config: c,
  40. id: id,
  41. pubAddr: c.Addr,
  42. nodes: make(map[string]bool),
  43. tickDuration: defaultTickDuration,
  44. proposal: make(chan v2Proposal),
  45. node: &v2Raft{
  46. Node: raft.New(id, defaultHeartbeat, defaultElection),
  47. result: make(map[wait]chan interface{}),
  48. },
  49. t: newTransporter(),
  50. Store: store.New(),
  51. stop: make(chan struct{}),
  52. }
  53. for _, seed := range c.Peers {
  54. s.nodes[seed] = true
  55. }
  56. m := http.NewServeMux()
  57. //m.Handle("/HEAD", handlerErr(s.serveHead))
  58. m.Handle("/", handlerErr(s.serveValue))
  59. m.Handle("/raft", s.t)
  60. s.Handler = m
  61. return s
  62. }
  63. func (s *Server) SetTick(d time.Duration) {
  64. s.tickDuration = d
  65. }
  66. func (s *Server) Run() {
  67. if len(s.config.Peers) == 0 {
  68. s.Bootstrap()
  69. } else {
  70. s.Join()
  71. }
  72. }
  73. func (s *Server) Stop() {
  74. close(s.stop)
  75. s.t.stop()
  76. }
  77. func (s *Server) Bootstrap() {
  78. log.Println("starting a bootstrap node")
  79. s.node.Campaign()
  80. s.node.Add(s.id, s.pubAddr)
  81. s.apply(s.node.Next())
  82. s.run()
  83. }
  84. func (s *Server) Join() {
  85. log.Println("joining cluster via peers", s.config.Peers)
  86. d, err := json.Marshal(&raft.Config{s.id, s.pubAddr})
  87. if err != nil {
  88. panic(err)
  89. }
  90. b, err := json.Marshal(&raft.Message{From: s.id, Type: 2, Entries: []raft.Entry{{Type: 1, Data: d}}})
  91. if err != nil {
  92. panic(err)
  93. }
  94. for seed := range s.nodes {
  95. if err := s.t.send(seed+raftPrefix, b); err != nil {
  96. log.Println(err)
  97. continue
  98. }
  99. // todo(xiangli) WAIT for join to be committed or retry...
  100. break
  101. }
  102. s.run()
  103. }
  104. func (s *Server) run() {
  105. node := s.node
  106. recv := s.t.recv
  107. ticker := time.NewTicker(s.tickDuration)
  108. v2SyncTicker := time.NewTicker(time.Millisecond * 500)
  109. var proposal chan v2Proposal
  110. for {
  111. if node.HasLeader() {
  112. proposal = s.proposal
  113. } else {
  114. proposal = nil
  115. }
  116. select {
  117. case p := <-proposal:
  118. node.Propose(p)
  119. case msg := <-recv:
  120. node.Step(*msg)
  121. case <-ticker.C:
  122. node.Tick()
  123. case <-v2SyncTicker.C:
  124. node.Sync()
  125. case <-s.stop:
  126. log.Printf("Node: %d stopped\n", s.id)
  127. return
  128. }
  129. s.apply(node.Next())
  130. s.send(node.Msgs())
  131. }
  132. }
  133. func (s *Server) apply(ents []raft.Entry) {
  134. offset := s.node.Applied() - len(ents) + 1
  135. for i, ent := range ents {
  136. switch ent.Type {
  137. // expose raft entry type
  138. case raft.Normal:
  139. if len(ent.Data) == 0 {
  140. continue
  141. }
  142. s.v2apply(offset+i, ent)
  143. case raft.AddNode:
  144. cfg := new(raft.Config)
  145. if err := json.Unmarshal(ent.Data, cfg); err != nil {
  146. log.Println(err)
  147. break
  148. }
  149. if err := s.t.set(cfg.NodeId, cfg.Addr); err != nil {
  150. log.Println(err)
  151. break
  152. }
  153. log.Printf("Add Node %x %v\n", cfg.NodeId, cfg.Addr)
  154. s.nodes[cfg.Addr] = true
  155. p := path.Join(v2machineKVPrefix, fmt.Sprint(cfg.NodeId))
  156. s.Store.Set(p, false, cfg.Addr, store.Permanent)
  157. default:
  158. panic("unimplemented")
  159. }
  160. }
  161. }
  162. func (s *Server) send(msgs []raft.Message) {
  163. for i := range msgs {
  164. data, err := json.Marshal(msgs[i])
  165. if err != nil {
  166. // todo(xiangli): error handling
  167. log.Fatal(err)
  168. }
  169. // todo(xiangli): reuse routines and limit the number of sending routines
  170. // sync.Pool?
  171. go func(i int) {
  172. var err error
  173. if err = s.t.sendTo(msgs[i].To, data); err == nil {
  174. return
  175. }
  176. if err == errUnknownNode {
  177. err = s.fetchAddr(msgs[i].To)
  178. }
  179. if err == nil {
  180. err = s.t.sendTo(msgs[i].To, data)
  181. }
  182. if err != nil {
  183. log.Println(err)
  184. }
  185. }(i)
  186. }
  187. }
  188. func (s *Server) fetchAddr(nodeId int) error {
  189. for seed := range s.nodes {
  190. if err := s.t.fetchAddr(seed, nodeId); err == nil {
  191. return nil
  192. }
  193. }
  194. return fmt.Errorf("cannot fetch the address of node %d", nodeId)
  195. }