server.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package etcdserver
  2. import (
  3. "errors"
  4. "time"
  5. "code.google.com/p/go.net/context"
  6. "github.com/coreos/etcd/raft"
  7. "github.com/coreos/etcd/store"
  8. "github.com/coreos/etcd/wait"
  9. )
  10. var ErrUnknownMethod = errors.New("etcdserver: unknown method")
  11. type Response struct {
  12. // The last seen term raft was at when this request was built.
  13. Term int
  14. // The last seen index raft was at when this request was built.
  15. Index int
  16. *store.Event
  17. *store.Watcher
  18. err error
  19. }
  20. type Server struct {
  21. n raft.Node
  22. w wait.List
  23. msgsc chan raft.Message
  24. st store.Store
  25. // Send specifies the send function for sending msgs to peers. Send
  26. // MUST NOT block. It is okay to drop messages, since clients should
  27. // timeout and reissue their messages. If Send is nil, Server will
  28. // panic.
  29. Send func(msgs []raft.Message)
  30. // Save specifies the save function for saving ents to stable storage.
  31. // Save MUST block until st and ents are on stable storage. If Send is
  32. // nil, Server will panic.
  33. Save func(st raft.State, ents []raft.Entry)
  34. }
  35. func (s *Server) Run(ctx context.Context) {
  36. for {
  37. select {
  38. case rd := <-s.n.Ready():
  39. s.Save(rd.State, rd.Entries)
  40. s.Send(rd.Messages)
  41. go func() {
  42. for _, e := range rd.CommittedEntries {
  43. s.apply(rd, e)
  44. }
  45. }()
  46. case <-ctx.Done():
  47. return
  48. }
  49. }
  50. }
  51. func (s *Server) Do(ctx context.Context, r Request) (Response, error) {
  52. if r.Id == 0 {
  53. panic("r.Id cannot be 0")
  54. }
  55. switch r.Method {
  56. case "POST", "PUT", "DELETE":
  57. data, err := r.Marshal()
  58. if err != nil {
  59. return Response{}, err
  60. }
  61. ch := s.w.Register(r.Id)
  62. s.n.Propose(ctx, data)
  63. select {
  64. case x := <-ch:
  65. resp := x.(Response)
  66. return resp, resp.err
  67. case <-ctx.Done():
  68. s.w.Trigger(r.Id, nil) // GC wait
  69. return Response{}, ctx.Err()
  70. }
  71. case "GET":
  72. switch {
  73. case r.Wait:
  74. wc, err := s.st.Watch(r.Path, r.Recursive, false, r.Since)
  75. if err != nil {
  76. return Response{}, err
  77. }
  78. return Response{Watcher: wc}, nil
  79. default:
  80. ev, err := s.st.Get(r.Path, r.Recursive, r.Sorted)
  81. if err != nil {
  82. return Response{}, err
  83. }
  84. return Response{Event: ev}, nil
  85. }
  86. default:
  87. return Response{}, ErrUnknownMethod
  88. }
  89. }
  90. func respond(rd Ready, ev *store.Event, err error) Response {
  91. return Response{Term: rd.Term, Index: rd.Index, Event: ev, err: err}
  92. }
  93. // apply interprets r as a call to store.X and returns an Response interpreted from store.Event
  94. func (s *Server) apply(rd Ready, e raft.Entry) Response {
  95. resp := Response{Term: rd.Term, Index: rd.Index}
  96. var r Request
  97. if resp.err = r.Unmarshal(e.Data); resp.err != nil {
  98. return resp
  99. }
  100. switch r.Method {
  101. case "POST":
  102. resp.Event, resp.err = st.Create(r.Path, r.Dir, r.Val, true, time.Unix(0, r.Expiration))
  103. return resp
  104. case "PUT":
  105. case "DELETE":
  106. default:
  107. return Response{err: ErrUnknownMethod}
  108. }
  109. }