server.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package etcdserver
  2. import (
  3. "errors"
  4. "time"
  5. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  6. "github.com/coreos/etcd/raft"
  7. "github.com/coreos/etcd/raft/raftpb"
  8. "github.com/coreos/etcd/store"
  9. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  10. "github.com/coreos/etcd/wait"
  11. )
  12. var (
  13. ErrUnknownMethod = errors.New("etcdserver: unknown method")
  14. ErrStopped = errors.New("etcdserver: server stopped")
  15. )
  16. type SendFunc func(m []raftpb.Message)
  17. type SaveFunc func(st raftpb.State, ents []raftpb.Entry)
  18. type Response struct {
  19. Event *store.Event
  20. Watcher store.Watcher
  21. err error
  22. }
  23. type Server interface {
  24. // Start performs any initialization of the Server necessary for it to
  25. // begin serving requests. It must be called before Do or Process.
  26. // Start must be non-blocking; any long-running server functionality
  27. // should be implemented in goroutines.
  28. Start()
  29. // Stop terminates the Server and performs any necessary finalization.
  30. // Do and Process cannot be called after Stop has been invoked.
  31. Stop()
  32. // Do takes a request and attempts to fulfil it, returning a Response.
  33. Do(ctx context.Context, r pb.Request) (Response, error)
  34. // Process takes a raft message and applies it to the server's raft state
  35. // machine, respecting any timeout of the given context.
  36. Process(ctx context.Context, m raftpb.Message) error
  37. }
  38. // EtcdServer is the production implementation of the Server interface
  39. type EtcdServer struct {
  40. w wait.Wait
  41. done chan struct{}
  42. Node raft.Node
  43. Store store.Store
  44. // Send specifies the send function for sending msgs to peers. Send
  45. // MUST NOT block. It is okay to drop messages, since clients should
  46. // timeout and reissue their messages. If Send is nil, server will
  47. // panic.
  48. Send SendFunc
  49. // Save specifies the save function for saving ents to stable storage.
  50. // Save MUST block until st and ents are on stable storage. If Send is
  51. // nil, server will panic.
  52. Save func(st raftpb.State, ents []raftpb.Entry)
  53. Ticker <-chan time.Time
  54. }
  55. // Start prepares and starts server in a new goroutine. It is no longer safe to
  56. // modify a server's fields after it has been sent to Start.
  57. func (s *EtcdServer) Start() {
  58. s.w = wait.New()
  59. s.done = make(chan struct{})
  60. go s.run()
  61. }
  62. func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
  63. return s.Node.Step(ctx, m)
  64. }
  65. func (s *EtcdServer) run() {
  66. for {
  67. select {
  68. case <-s.Ticker:
  69. s.Node.Tick()
  70. case rd := <-s.Node.Ready():
  71. s.Save(rd.State, rd.Entries)
  72. s.Send(rd.Messages)
  73. // TODO(bmizerany): do this in the background, but take
  74. // care to apply entries in a single goroutine, and not
  75. // race them.
  76. for _, e := range rd.CommittedEntries {
  77. var r pb.Request
  78. if err := r.Unmarshal(e.Data); err != nil {
  79. panic("TODO: this is bad, what do we do about it?")
  80. }
  81. s.w.Trigger(r.Id, s.apply(r))
  82. }
  83. case <-s.done:
  84. return
  85. }
  86. }
  87. }
  88. // Stop stops the server, and shuts down the running goroutine. Stop should be
  89. // called after a Start(s), otherwise it will block forever.
  90. func (s *EtcdServer) Stop() {
  91. s.Node.Stop()
  92. close(s.done)
  93. }
  94. // Do interprets r and performs an operation on s.Store according to r.Method
  95. // and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET with
  96. // Quorum == true, r will be sent through consensus before performing its
  97. // respective operation. Do will block until an action is performed or there is
  98. // an error.
  99. func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
  100. if r.Id == 0 {
  101. panic("r.Id cannot be 0")
  102. }
  103. if r.Method == "GET" && r.Quorum {
  104. r.Method = "QGET"
  105. }
  106. switch r.Method {
  107. case "POST", "PUT", "DELETE", "QGET":
  108. data, err := r.Marshal()
  109. if err != nil {
  110. return Response{}, err
  111. }
  112. ch := s.w.Register(r.Id)
  113. s.Node.Propose(ctx, data)
  114. select {
  115. case x := <-ch:
  116. resp := x.(Response)
  117. return resp, resp.err
  118. case <-ctx.Done():
  119. s.w.Trigger(r.Id, nil) // GC wait
  120. return Response{}, ctx.Err()
  121. case <-s.done:
  122. return Response{}, ErrStopped
  123. }
  124. case "GET":
  125. switch {
  126. case r.Wait:
  127. wc, err := s.Store.Watch(r.Path, r.Recursive, false, r.Since)
  128. if err != nil {
  129. return Response{}, err
  130. }
  131. return Response{Watcher: wc}, nil
  132. default:
  133. ev, err := s.Store.Get(r.Path, r.Recursive, r.Sorted)
  134. if err != nil {
  135. return Response{}, err
  136. }
  137. return Response{Event: ev}, nil
  138. }
  139. default:
  140. return Response{}, ErrUnknownMethod
  141. }
  142. }
  143. // apply interprets r as a call to store.X and returns an Response interpreted from store.Event
  144. func (s *EtcdServer) apply(r pb.Request) Response {
  145. f := func(ev *store.Event, err error) Response {
  146. return Response{Event: ev, err: err}
  147. }
  148. expr := time.Unix(0, r.Expiration)
  149. switch r.Method {
  150. case "POST":
  151. return f(s.Store.Create(r.Path, r.Dir, r.Val, true, expr))
  152. case "PUT":
  153. exists, existsSet := getBool(r.PrevExists)
  154. switch {
  155. case existsSet:
  156. if exists {
  157. return f(s.Store.Update(r.Path, r.Val, expr))
  158. } else {
  159. return f(s.Store.Create(r.Path, r.Dir, r.Val, false, expr))
  160. }
  161. case r.PrevIndex > 0 || r.PrevValue != "":
  162. return f(s.Store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
  163. default:
  164. return f(s.Store.Set(r.Path, r.Dir, r.Val, expr))
  165. }
  166. case "DELETE":
  167. switch {
  168. case r.PrevIndex > 0 || r.PrevValue != "":
  169. return f(s.Store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
  170. default:
  171. return f(s.Store.Delete(r.Path, r.Recursive, r.Dir))
  172. }
  173. case "QGET":
  174. return f(s.Store.Get(r.Path, r.Recursive, r.Sorted))
  175. default:
  176. // This should never be reached, but just in case:
  177. return Response{err: ErrUnknownMethod}
  178. }
  179. }
  180. func getBool(v *bool) (vv bool, set bool) {
  181. if v == nil {
  182. return false, false
  183. }
  184. return *v, true
  185. }