server.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package etcdserver
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "io"
  6. "log"
  7. "time"
  8. crand "crypto/rand"
  9. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  10. "github.com/coreos/etcd/raft"
  11. "github.com/coreos/etcd/raft/raftpb"
  12. "github.com/coreos/etcd/store"
  13. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  14. "github.com/coreos/etcd/wait"
  15. )
  16. const defaultSyncTimeout = time.Second
  17. var (
  18. ErrUnknownMethod = errors.New("etcdserver: unknown method")
  19. ErrStopped = errors.New("etcdserver: server stopped")
  20. )
  21. type SendFunc func(m []raftpb.Message)
  22. type SaveFunc func(st raftpb.HardState, ents []raftpb.Entry)
  23. type Response struct {
  24. Event *store.Event
  25. Watcher store.Watcher
  26. err error
  27. }
  28. type Server interface {
  29. // Start performs any initialization of the Server necessary for it to
  30. // begin serving requests. It must be called before Do or Process.
  31. // Start must be non-blocking; any long-running server functionality
  32. // should be implemented in goroutines.
  33. Start()
  34. // Stop terminates the Server and performs any necessary finalization.
  35. // Do and Process cannot be called after Stop has been invoked.
  36. Stop()
  37. // Do takes a request and attempts to fulfil it, returning a Response.
  38. Do(ctx context.Context, r pb.Request) (Response, error)
  39. // Process takes a raft message and applies it to the server's raft state
  40. // machine, respecting any timeout of the given context.
  41. Process(ctx context.Context, m raftpb.Message) error
  42. }
  43. // EtcdServer is the production implementation of the Server interface
  44. type EtcdServer struct {
  45. w wait.Wait
  46. done chan struct{}
  47. Node raft.Node
  48. Store store.Store
  49. // Send specifies the send function for sending msgs to peers. Send
  50. // MUST NOT block. It is okay to drop messages, since clients should
  51. // timeout and reissue their messages. If Send is nil, server will
  52. // panic.
  53. Send SendFunc
  54. // Save specifies the save function for saving ents to stable storage.
  55. // Save MUST block until st and ents are on stable storage. If Send is
  56. // nil, server will panic.
  57. Save func(st raftpb.HardState, ents []raftpb.Entry)
  58. Ticker <-chan time.Time
  59. SyncTicker <-chan time.Time
  60. }
  61. // Start prepares and starts server in a new goroutine. It is no longer safe to
  62. // modify a server's fields after it has been sent to Start.
  63. func (s *EtcdServer) Start() {
  64. s.w = wait.New()
  65. s.done = make(chan struct{})
  66. go s.run()
  67. }
  68. func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
  69. return s.Node.Step(ctx, m)
  70. }
  71. func (s *EtcdServer) run() {
  72. var syncC <-chan time.Time
  73. for {
  74. select {
  75. case <-s.Ticker:
  76. s.Node.Tick()
  77. case rd := <-s.Node.Ready():
  78. s.Save(rd.HardState, rd.Entries)
  79. s.Send(rd.Messages)
  80. // TODO(bmizerany): do this in the background, but take
  81. // care to apply entries in a single goroutine, and not
  82. // race them.
  83. for _, e := range rd.CommittedEntries {
  84. var r pb.Request
  85. if err := r.Unmarshal(e.Data); err != nil {
  86. panic("TODO: this is bad, what do we do about it?")
  87. }
  88. s.w.Trigger(r.Id, s.apply(r))
  89. }
  90. if rd.SoftState != nil {
  91. if rd.RaftState == raft.StateLeader {
  92. syncC = s.SyncTicker
  93. } else {
  94. syncC = nil
  95. }
  96. }
  97. case <-syncC:
  98. s.sync(defaultSyncTimeout)
  99. case <-s.done:
  100. return
  101. }
  102. }
  103. }
  104. // Stop stops the server, and shuts down the running goroutine. Stop should be
  105. // called after a Start(s), otherwise it will block forever.
  106. func (s *EtcdServer) Stop() {
  107. s.Node.Stop()
  108. close(s.done)
  109. }
  110. // Do interprets r and performs an operation on s.Store according to r.Method
  111. // and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
  112. // Quorum == true, r will be sent through consensus before performing its
  113. // respective operation. Do will block until an action is performed or there is
  114. // an error.
  115. func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
  116. if r.Id == 0 {
  117. panic("r.Id cannot be 0")
  118. }
  119. if r.Method == "GET" && r.Quorum {
  120. r.Method = "QGET"
  121. }
  122. switch r.Method {
  123. case "POST", "PUT", "DELETE", "QGET":
  124. data, err := r.Marshal()
  125. if err != nil {
  126. return Response{}, err
  127. }
  128. ch := s.w.Register(r.Id)
  129. s.Node.Propose(ctx, data)
  130. select {
  131. case x := <-ch:
  132. resp := x.(Response)
  133. return resp, resp.err
  134. case <-ctx.Done():
  135. s.w.Trigger(r.Id, nil) // GC wait
  136. return Response{}, ctx.Err()
  137. case <-s.done:
  138. return Response{}, ErrStopped
  139. }
  140. case "GET":
  141. switch {
  142. case r.Wait:
  143. wc, err := s.Store.Watch(r.Path, r.Recursive, false, r.Since)
  144. if err != nil {
  145. return Response{}, err
  146. }
  147. return Response{Watcher: wc}, nil
  148. default:
  149. ev, err := s.Store.Get(r.Path, r.Recursive, r.Sorted)
  150. if err != nil {
  151. return Response{}, err
  152. }
  153. return Response{Event: ev}, nil
  154. }
  155. default:
  156. return Response{}, ErrUnknownMethod
  157. }
  158. }
  159. // sync proposes a SYNC request and is non-blocking.
  160. // This makes no guarantee that the request will be proposed or performed.
  161. // The request will be cancelled after the given timeout.
  162. func (s *EtcdServer) sync(timeout time.Duration) {
  163. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  164. req := pb.Request{
  165. Method: "SYNC",
  166. Id: GenID(),
  167. Time: time.Now().UnixNano(),
  168. }
  169. data, err := req.Marshal()
  170. if err != nil {
  171. log.Printf("marshal request %#v error: %v", req, err)
  172. return
  173. }
  174. // There is no promise that node has leader when do SYNC request,
  175. // so it uses goroutine to propose.
  176. go func() {
  177. s.Node.Propose(ctx, data)
  178. cancel()
  179. }()
  180. }
  181. // apply interprets r as a call to store.X and returns an Response interpreted from store.Event
  182. func (s *EtcdServer) apply(r pb.Request) Response {
  183. f := func(ev *store.Event, err error) Response {
  184. return Response{Event: ev, err: err}
  185. }
  186. expr := time.Unix(0, r.Expiration)
  187. switch r.Method {
  188. case "POST":
  189. return f(s.Store.Create(r.Path, r.Dir, r.Val, true, expr))
  190. case "PUT":
  191. exists, existsSet := getBool(r.PrevExists)
  192. switch {
  193. case existsSet:
  194. if exists {
  195. return f(s.Store.Update(r.Path, r.Val, expr))
  196. } else {
  197. return f(s.Store.Create(r.Path, r.Dir, r.Val, false, expr))
  198. }
  199. case r.PrevIndex > 0 || r.PrevValue != "":
  200. return f(s.Store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
  201. default:
  202. return f(s.Store.Set(r.Path, r.Dir, r.Val, expr))
  203. }
  204. case "DELETE":
  205. switch {
  206. case r.PrevIndex > 0 || r.PrevValue != "":
  207. return f(s.Store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
  208. default:
  209. return f(s.Store.Delete(r.Path, r.Recursive, r.Dir))
  210. }
  211. case "QGET":
  212. return f(s.Store.Get(r.Path, r.Recursive, r.Sorted))
  213. case "SYNC":
  214. s.Store.DeleteExpiredKeys(time.Unix(0, r.Time))
  215. return Response{}
  216. default:
  217. // This should never be reached, but just in case:
  218. return Response{err: ErrUnknownMethod}
  219. }
  220. }
  221. // TODO: move the function to /id pkg maybe?
  222. // GenID generates a random id that is not equal to 0.
  223. func GenID() int64 {
  224. for {
  225. b := make([]byte, 8)
  226. if _, err := io.ReadFull(crand.Reader, b); err != nil {
  227. panic(err) // really bad stuff happened
  228. }
  229. n := int64(binary.BigEndian.Uint64(b))
  230. if n != 0 {
  231. return n
  232. }
  233. }
  234. }
  235. func getBool(v *bool) (vv bool, set bool) {
  236. if v == nil {
  237. return false, false
  238. }
  239. return *v, true
  240. }