server.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. package etcdserver
  2. import (
  3. "errors"
  4. "time"
  5. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  6. "github.com/coreos/etcd/raft"
  7. "github.com/coreos/etcd/raft/raftpb"
  8. "github.com/coreos/etcd/store"
  9. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  10. "github.com/coreos/etcd/wait"
  11. )
  12. var (
  13. ErrUnknownMethod = errors.New("etcdserver: unknown method")
  14. ErrStopped = errors.New("etcdserver: server stopped")
  15. )
  16. type SendFunc func(m []raftpb.Message)
  17. type Response struct {
  18. Event *store.Event
  19. Watcher store.Watcher
  20. err error
  21. }
  22. type Server struct {
  23. w wait.Wait
  24. done chan struct{}
  25. Node raft.Node
  26. Store store.Store
  27. // Send specifies the send function for sending msgs to peers. Send
  28. // MUST NOT block. It is okay to drop messages, since clients should
  29. // timeout and reissue their messages. If Send is nil, Server will
  30. // panic.
  31. Send SendFunc
  32. // Save specifies the save function for saving ents to stable storage.
  33. // Save MUST block until st and ents are on stable storage. If Send is
  34. // nil, Server will panic.
  35. Save func(st raftpb.State, ents []raftpb.Entry)
  36. Ticker <-chan time.Time
  37. }
  38. // Start prepares and starts server in a new goroutine. It is no longer safe to
  39. // modify a Servers fields after it has been sent to Start.
  40. func Start(s *Server) {
  41. s.w = wait.New()
  42. s.done = make(chan struct{})
  43. go s.run()
  44. }
  45. func (s *Server) run() {
  46. for {
  47. select {
  48. case <-s.Ticker:
  49. s.Node.Tick()
  50. case rd := <-s.Node.Ready():
  51. s.Save(rd.State, rd.Entries)
  52. s.Send(rd.Messages)
  53. // TODO(bmizerany): do this in the background, but take
  54. // care to apply entries in a single goroutine, and not
  55. // race them.
  56. for _, e := range rd.CommittedEntries {
  57. var r pb.Request
  58. if err := r.Unmarshal(e.Data); err != nil {
  59. panic("TODO: this is bad, what do we do about it?")
  60. }
  61. s.w.Trigger(r.Id, s.apply(r))
  62. }
  63. case <-s.done:
  64. return
  65. }
  66. }
  67. }
  68. // Stop stops the server, and shutsdown the running goroutine. Stop should be
  69. // called after a Start(s), otherwise it will panic.
  70. func (s *Server) Stop() {
  71. s.Node.Stop()
  72. close(s.done)
  73. }
  74. // Do interprets r and performs an operation on s.Store according to r.Method
  75. // and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET with
  76. // Quorum == true, r will be sent through consensus before performing its
  77. // respective operation. Do will block until an action is performed or there is
  78. // an error.
  79. func (s *Server) Do(ctx context.Context, r pb.Request) (Response, error) {
  80. if r.Id == 0 {
  81. panic("r.Id cannot be 0")
  82. }
  83. if r.Method == "GET" && r.Quorum {
  84. r.Method = "QGET"
  85. }
  86. switch r.Method {
  87. case "POST", "PUT", "DELETE", "QGET":
  88. data, err := r.Marshal()
  89. if err != nil {
  90. return Response{}, err
  91. }
  92. ch := s.w.Register(r.Id)
  93. s.Node.Propose(ctx, data)
  94. select {
  95. case x := <-ch:
  96. resp := x.(Response)
  97. return resp, resp.err
  98. case <-ctx.Done():
  99. s.w.Trigger(r.Id, nil) // GC wait
  100. return Response{}, ctx.Err()
  101. case <-s.done:
  102. return Response{}, ErrStopped
  103. }
  104. case "GET":
  105. switch {
  106. case r.Wait:
  107. wc, err := s.Store.Watch(r.Path, r.Recursive, false, r.Since)
  108. if err != nil {
  109. return Response{}, err
  110. }
  111. return Response{Watcher: wc}, nil
  112. default:
  113. ev, err := s.Store.Get(r.Path, r.Recursive, r.Sorted)
  114. if err != nil {
  115. return Response{}, err
  116. }
  117. return Response{Event: ev}, nil
  118. }
  119. default:
  120. return Response{}, ErrUnknownMethod
  121. }
  122. }
  123. // apply interprets r as a call to store.X and returns an Response interpreted from store.Event
  124. func (s *Server) apply(r pb.Request) Response {
  125. f := func(ev *store.Event, err error) Response {
  126. return Response{Event: ev, err: err}
  127. }
  128. expr := time.Unix(0, r.Expiration)
  129. switch r.Method {
  130. case "POST":
  131. return f(s.Store.Create(r.Path, r.Dir, r.Val, true, expr))
  132. case "PUT":
  133. exists, existsSet := getBool(r.PrevExists)
  134. switch {
  135. case existsSet:
  136. if exists {
  137. return f(s.Store.Update(r.Path, r.Val, expr))
  138. } else {
  139. return f(s.Store.Create(r.Path, r.Dir, r.Val, false, expr))
  140. }
  141. case r.PrevIndex > 0 || r.PrevValue != "":
  142. return f(s.Store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
  143. default:
  144. return f(s.Store.Set(r.Path, r.Dir, r.Val, expr))
  145. }
  146. case "DELETE":
  147. switch {
  148. case r.PrevIndex > 0 || r.PrevValue != "":
  149. return f(s.Store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
  150. default:
  151. return f(s.Store.Delete(r.Path, r.Recursive, r.Dir))
  152. }
  153. case "QGET":
  154. return f(s.Store.Get(r.Path, r.Recursive, r.Sorted))
  155. default:
  156. // This should never be reached, but just in case:
  157. return Response{err: ErrUnknownMethod}
  158. }
  159. }
  160. func getBool(v *bool) (vv bool, set bool) {
  161. if v == nil {
  162. return false, false
  163. }
  164. return *v, true
  165. }