server.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. package etcdserver
  2. import (
  3. "errors"
  4. "code.google.com/p/go.net/context"
  5. "github.com/coreos/etcd/raft"
  6. "github.com/coreos/etcd/store"
  7. "github.com/coreos/etcd/wait"
  8. )
  9. var ErrUnknownMethod = errors.New("etcdserver: unknown method")
  10. type Response struct {
  11. // The last seen term raft was at when this request was built.
  12. Term int
  13. // The last seen index raft was at when this request was built.
  14. Index int
  15. *store.Event
  16. *store.Watcher
  17. err error
  18. }
  19. type Server struct {
  20. n raft.Node
  21. w wait.List
  22. msgsc chan raft.Message
  23. st store.Store
  24. // Send specifies the send function for sending msgs to peers. Send
  25. // MUST NOT block. It is okay to drop messages, since clients should
  26. // timeout and reissue their messages. If Send is nil, Server will
  27. // panic.
  28. Send func(msgs []raft.Message)
  29. // Save specifies the save function for saving ents to stable storage.
  30. // Save MUST block until st and ents are on stable storage. If Send is
  31. // nil, Server will panic.
  32. Save func(st raft.State, ents []raft.Entry)
  33. }
  34. func (s *Server) Run(ctx context.Context) {
  35. for {
  36. select {
  37. case rd := <-s.n.Ready():
  38. s.Save(rd.State, rd.Entries)
  39. s.Send(rd.Messages)
  40. go func() {
  41. for _, e := range rd.CommittedEntries {
  42. var r Request
  43. r.Unmarshal(e.Data)
  44. s.w.Trigger(r.Id, s.apply(r))
  45. }
  46. }()
  47. case <-ctx.Done():
  48. return
  49. }
  50. }
  51. }
  52. func (s *Server) Do(ctx context.Context, r Request) (Response, error) {
  53. if r.Id == 0 {
  54. panic("r.Id cannot be 0")
  55. }
  56. switch r.Method {
  57. case "POST", "PUT", "DELETE":
  58. data, err := r.Marshal()
  59. if err != nil {
  60. return Response{}, err
  61. }
  62. ch := s.w.Register(r.Id)
  63. s.n.Propose(ctx, data)
  64. select {
  65. case x := <-ch:
  66. resp := x.(Response)
  67. return resp, resp.err
  68. case <-ctx.Done():
  69. s.w.Trigger(r.Id, nil) // GC wait
  70. return Response{}, ctx.Err()
  71. }
  72. case "GET":
  73. switch {
  74. case r.Wait:
  75. wc, err := s.st.Watch(r.Path, r.Recursive, false, r.Since)
  76. if err != nil {
  77. return Response{}, err
  78. }
  79. return Response{Watcher: wc}, nil
  80. default:
  81. ev, err := s.st.Get(r.Path, r.Recursive, r.Sorted)
  82. if err != nil {
  83. return Response{}, err
  84. }
  85. return Response{Event: ev}, nil
  86. }
  87. default:
  88. return Response{}, ErrUnknownMethod
  89. }
  90. }
  91. // apply interprets r as a call to store.X and returns an Response interpreted from store.Event
  92. func (s *Server) apply(r Request) Response {
  93. panic("not implmented")
  94. }