server.go 11 KB


  1. package etcdserver
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "log"
  6. "math/rand"
  7. "sync/atomic"
  8. "time"
  9. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  10. "github.com/coreos/etcd/raft"
  11. "github.com/coreos/etcd/raft/raftpb"
  12. "github.com/coreos/etcd/store"
  13. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  14. "github.com/coreos/etcd/wait"
  15. )
  16. const (
  17. defaultSyncTimeout = time.Second
  18. DefaultSnapCount = 10000
  19. )
  20. var (
  21. ErrUnknownMethod = errors.New("etcdserver: unknown method")
  22. ErrStopped = errors.New("etcdserver: server stopped")
  23. )
  24. func init() {
  25. rand.Seed(time.Now().UnixNano())
  26. }
  27. type SendFunc func(m []raftpb.Message)
  28. type SaveFunc func(st raftpb.HardState, ents []raftpb.Entry)
  29. type Response struct {
  30. Event *store.Event
  31. Watcher store.Watcher
  32. err error
  33. }
  34. type Storage interface {
  35. // Save function saves ents and state to the underlying stable storage.
  36. // Save MUST block until st and ents are on stable storage.
  37. Save(st raftpb.HardState, ents []raftpb.Entry)
  38. // SaveSnap function saves snapshot to the underlying stable storage.
  39. SaveSnap(snap raftpb.Snapshot)
  40. // TODO: WAL should be able to control cut itself. After implement self-controled cut,
  41. // remove it in this interface.
  42. // Cut cuts out a new wal file for saving new state and entries.
  43. Cut() error
  44. }
  45. type Server interface {
  46. // Start performs any initialization of the Server necessary for it to
  47. // begin serving requests. It must be called before Do or Process.
  48. // Start must be non-blocking; any long-running server functionality
  49. // should be implemented in goroutines.
  50. Start()
  51. // Stop terminates the Server and performs any necessary finalization.
  52. // Do and Process cannot be called after Stop has been invoked.
  53. Stop()
  54. // Do takes a request and attempts to fulfil it, returning a Response.
  55. Do(ctx context.Context, r pb.Request) (Response, error)
  56. // Process takes a raft message and applies it to the server's raft state
  57. // machine, respecting any timeout of the given context.
  58. Process(ctx context.Context, m raftpb.Message) error
  59. }
  60. type RaftTimer interface {
  61. Index() int64
  62. Term() int64
  63. }
  64. // EtcdServer is the production implementation of the Server interface
  65. type EtcdServer struct {
  66. w wait.Wait
  67. done chan struct{}
  68. Node raft.Node
  69. Store store.Store
  70. // Send specifies the send function for sending msgs to members. Send
  71. // MUST NOT block. It is okay to drop messages, since clients should
  72. // timeout and reissue their messages. If Send is nil, server will
  73. // panic.
  74. Send SendFunc
  75. Storage Storage
  76. Ticker <-chan time.Time
  77. SyncTicker <-chan time.Time
  78. SnapCount int64 // number of entries to trigger a snapshot
  79. // Cache of the latest raft index and raft term the server has seen
  80. raftIndex int64
  81. raftTerm int64
  82. ClusterStore ClusterStore
  83. }
  84. // Start prepares and starts server in a new goroutine. It is no longer safe to
  85. // modify a server's fields after it has been sent to Start.
  86. func (s *EtcdServer) Start() {
  87. if s.SnapCount == 0 {
  88. log.Printf("etcdserver: set snapshot count to default %d", DefaultSnapCount)
  89. s.SnapCount = DefaultSnapCount
  90. }
  91. s.w = wait.New()
  92. s.done = make(chan struct{})
  93. // TODO: if this is an empty log, writes all peer infos
  94. // into the first entry
  95. go s.run()
  96. }
  97. func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
  98. return s.Node.Step(ctx, m)
  99. }
  100. func (s *EtcdServer) run() {
  101. var syncC <-chan time.Time
  102. // snapi indicates the index of the last submitted snapshot request
  103. var snapi, appliedi int64
  104. for {
  105. select {
  106. case <-s.Ticker:
  107. s.Node.Tick()
  108. case rd := <-s.Node.Ready():
  109. s.Storage.Save(rd.HardState, rd.Entries)
  110. s.Storage.SaveSnap(rd.Snapshot)
  111. s.Send(rd.Messages)
  112. // TODO(bmizerany): do this in the background, but take
  113. // care to apply entries in a single goroutine, and not
  114. // race them.
  115. // TODO: apply configuration change into ClusterStore.
  116. for _, e := range rd.CommittedEntries {
  117. switch e.Type {
  118. case raftpb.EntryNormal:
  119. var r pb.Request
  120. if err := r.Unmarshal(e.Data); err != nil {
  121. panic("TODO: this is bad, what do we do about it?")
  122. }
  123. s.w.Trigger(r.ID, s.apply(r))
  124. case raftpb.EntryConfChange:
  125. var cc raftpb.ConfChange
  126. if err := cc.Unmarshal(e.Data); err != nil {
  127. panic("TODO: this is bad, what do we do about it?")
  128. }
  129. s.Node.ApplyConfChange(cc)
  130. s.w.Trigger(cc.ID, nil)
  131. default:
  132. panic("unexpected entry type")
  133. }
  134. atomic.StoreInt64(&s.raftIndex, e.Index)
  135. atomic.StoreInt64(&s.raftTerm, e.Term)
  136. appliedi = e.Index
  137. }
  138. if rd.Snapshot.Index > snapi {
  139. snapi = rd.Snapshot.Index
  140. }
  141. // recover from snapshot if it is more updated than current applied
  142. if rd.Snapshot.Index > appliedi {
  143. if err := s.Store.Recovery(rd.Snapshot.Data); err != nil {
  144. panic("TODO: this is bad, what do we do about it?")
  145. }
  146. appliedi = rd.Snapshot.Index
  147. }
  148. if appliedi-snapi > s.SnapCount {
  149. s.snapshot()
  150. snapi = appliedi
  151. }
  152. if rd.SoftState != nil {
  153. if rd.RaftState == raft.StateLeader {
  154. syncC = s.SyncTicker
  155. } else {
  156. syncC = nil
  157. }
  158. }
  159. case <-syncC:
  160. s.sync(defaultSyncTimeout)
  161. case <-s.done:
  162. return
  163. }
  164. }
  165. }
  166. // Stop stops the server, and shuts down the running goroutine. Stop should be
  167. // called after a Start(s), otherwise it will block forever.
  168. func (s *EtcdServer) Stop() {
  169. s.Node.Stop()
  170. close(s.done)
  171. }
  172. // Do interprets r and performs an operation on s.Store according to r.Method
  173. // and other fields. If r.Method is "POST", "PUT", "DELETE", or a "GET" with
  174. // Quorum == true, r will be sent through consensus before performing its
  175. // respective operation. Do will block until an action is performed or there is
  176. // an error.
  177. func (s *EtcdServer) Do(ctx context.Context, r pb.Request) (Response, error) {
  178. if r.ID == 0 {
  179. panic("r.Id cannot be 0")
  180. }
  181. if r.Method == "GET" && r.Quorum {
  182. r.Method = "QGET"
  183. }
  184. switch r.Method {
  185. case "POST", "PUT", "DELETE", "QGET":
  186. data, err := r.Marshal()
  187. if err != nil {
  188. return Response{}, err
  189. }
  190. ch := s.w.Register(r.ID)
  191. s.Node.Propose(ctx, data)
  192. select {
  193. case x := <-ch:
  194. resp := x.(Response)
  195. return resp, resp.err
  196. case <-ctx.Done():
  197. s.w.Trigger(r.ID, nil) // GC wait
  198. return Response{}, ctx.Err()
  199. case <-s.done:
  200. return Response{}, ErrStopped
  201. }
  202. case "GET":
  203. switch {
  204. case r.Wait:
  205. wc, err := s.Store.Watch(r.Path, r.Recursive, r.Stream, r.Since)
  206. if err != nil {
  207. return Response{}, err
  208. }
  209. return Response{Watcher: wc}, nil
  210. default:
  211. ev, err := s.Store.Get(r.Path, r.Recursive, r.Sorted)
  212. if err != nil {
  213. return Response{}, err
  214. }
  215. return Response{Event: ev}, nil
  216. }
  217. default:
  218. return Response{}, ErrUnknownMethod
  219. }
  220. }
  221. func (s *EtcdServer) AddNode(ctx context.Context, id int64, context []byte) error {
  222. cc := raftpb.ConfChange{
  223. ID: GenID(),
  224. Type: raftpb.ConfChangeAddNode,
  225. NodeID: id,
  226. Context: context,
  227. }
  228. return s.configure(ctx, cc)
  229. }
  230. func (s *EtcdServer) RemoveNode(ctx context.Context, id int64) error {
  231. cc := raftpb.ConfChange{
  232. ID: GenID(),
  233. Type: raftpb.ConfChangeRemoveNode,
  234. NodeID: id,
  235. }
  236. return s.configure(ctx, cc)
  237. }
  238. // Implement the RaftTimer interface
  239. func (s *EtcdServer) Index() int64 {
  240. return atomic.LoadInt64(&s.raftIndex)
  241. }
  242. func (s *EtcdServer) Term() int64 {
  243. return atomic.LoadInt64(&s.raftTerm)
  244. }
  245. // configure sends configuration change through consensus then performs it.
  246. // It will block until the change is performed or there is an error.
  247. func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) error {
  248. ch := s.w.Register(cc.ID)
  249. if err := s.Node.ProposeConfChange(ctx, cc); err != nil {
  250. log.Printf("configure error: %v", err)
  251. s.w.Trigger(cc.ID, nil)
  252. return err
  253. }
  254. select {
  255. case <-ch:
  256. return nil
  257. case <-ctx.Done():
  258. s.w.Trigger(cc.ID, nil) // GC wait
  259. return ctx.Err()
  260. case <-s.done:
  261. return ErrStopped
  262. }
  263. }
  264. // sync proposes a SYNC request and is non-blocking.
  265. // This makes no guarantee that the request will be proposed or performed.
  266. // The request will be cancelled after the given timeout.
  267. func (s *EtcdServer) sync(timeout time.Duration) {
  268. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  269. req := pb.Request{
  270. Method: "SYNC",
  271. ID: GenID(),
  272. Time: time.Now().UnixNano(),
  273. }
  274. data, err := req.Marshal()
  275. if err != nil {
  276. log.Printf("marshal request %#v error: %v", req, err)
  277. return
  278. }
  279. // There is no promise that node has leader when do SYNC request,
  280. // so it uses goroutine to propose.
  281. go func() {
  282. s.Node.Propose(ctx, data)
  283. cancel()
  284. }()
  285. }
  286. // publish registers server information into the cluster. The information
  287. // is the json format of the given member.
  288. // The function keeps attempting to register until it succeeds,
  289. // or its server is stopped.
  290. func (s *EtcdServer) publish(m Member, retryInterval time.Duration) {
  291. b, err := json.Marshal(m)
  292. if err != nil {
  293. log.Printf("etcdserver: json marshal error: %v", err)
  294. return
  295. }
  296. req := pb.Request{
  297. ID: GenID(),
  298. Method: "PUT",
  299. Path: m.storeKey(),
  300. Val: string(b),
  301. }
  302. for {
  303. ctx, cancel := context.WithTimeout(context.Background(), retryInterval)
  304. _, err := s.Do(ctx, req)
  305. cancel()
  306. switch err {
  307. case nil:
  308. log.Printf("etcdserver: published %+v to the cluster", m)
  309. return
  310. case ErrStopped:
  311. log.Printf("etcdserver: aborting publish because server is stopped")
  312. return
  313. default:
  314. log.Printf("etcdserver: publish error: %v", err)
  315. }
  316. }
  317. }
  318. func getExpirationTime(r *pb.Request) time.Time {
  319. var t time.Time
  320. if r.Expiration != 0 {
  321. t = time.Unix(0, r.Expiration)
  322. }
  323. return t
  324. }
  325. // apply interprets r as a call to store.X and returns a Response interpreted
  326. // from store.Event
  327. func (s *EtcdServer) apply(r pb.Request) Response {
  328. f := func(ev *store.Event, err error) Response {
  329. return Response{Event: ev, err: err}
  330. }
  331. expr := getExpirationTime(&r)
  332. switch r.Method {
  333. case "POST":
  334. return f(s.Store.Create(r.Path, r.Dir, r.Val, true, expr))
  335. case "PUT":
  336. exists, existsSet := getBool(r.PrevExist)
  337. switch {
  338. case existsSet:
  339. if exists {
  340. return f(s.Store.Update(r.Path, r.Val, expr))
  341. }
  342. return f(s.Store.Create(r.Path, r.Dir, r.Val, false, expr))
  343. case r.PrevIndex > 0 || r.PrevValue != "":
  344. return f(s.Store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, expr))
  345. default:
  346. return f(s.Store.Set(r.Path, r.Dir, r.Val, expr))
  347. }
  348. case "DELETE":
  349. switch {
  350. case r.PrevIndex > 0 || r.PrevValue != "":
  351. return f(s.Store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
  352. default:
  353. return f(s.Store.Delete(r.Path, r.Dir, r.Recursive))
  354. }
  355. case "QGET":
  356. return f(s.Store.Get(r.Path, r.Recursive, r.Sorted))
  357. case "SYNC":
  358. s.Store.DeleteExpiredKeys(time.Unix(0, r.Time))
  359. return Response{}
  360. default:
  361. // This should never be reached, but just in case:
  362. return Response{err: ErrUnknownMethod}
  363. }
  364. }
  365. // TODO: non-blocking snapshot
  366. func (s *EtcdServer) snapshot() {
  367. d, err := s.Store.Save()
  368. // TODO: current store will never fail to do a snapshot
  369. // what should we do if the store might fail?
  370. if err != nil {
  371. panic("TODO: this is bad, what do we do about it?")
  372. }
  373. s.Node.Compact(d)
  374. s.Storage.Cut()
  375. }
  376. // TODO: move the function to /id pkg maybe?
  377. // GenID generates a random id that is not equal to 0.
  378. func GenID() (n int64) {
  379. for n == 0 {
  380. n = rand.Int63()
  381. }
  382. return
  383. }
  384. func getBool(v *bool) (vv bool, set bool) {
  385. if v == nil {
  386. return false, false
  387. }
  388. return *v, true
  389. }