command.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "encoding/binary"
  16. "encoding/json"
  17. "fmt"
  18. "os"
  19. "path"
  20. "time"
  21. etcdErr "github.com/coreos/etcd/error"
  22. "github.com/coreos/etcd/store"
  23. "github.com/coreos/go-raft"
  24. )
  25. const commandPrefix = "etcd:"
  26. func commandName(name string) string {
  27. return commandPrefix + name
  28. }
  29. // A command represents an action to be taken on the replicated state machine.
  30. type Command interface {
  31. CommandName() string
  32. Apply(server *raft.Server) (interface{}, error)
  33. }
  34. // Set command
  35. type SetCommand struct {
  36. Key string `json:"key"`
  37. Value string `json:"value"`
  38. ExpireTime time.Time `json:"expireTime"`
  39. }
  40. // The name of the set command in the log
  41. func (c *SetCommand) CommandName() string {
  42. return commandName("set")
  43. }
  44. // Set the key-value pair
  45. func (c *SetCommand) Apply(server *raft.Server) (interface{}, error) {
  46. return etcdStore.Set(c.Key, c.Value, c.ExpireTime, server.CommitIndex())
  47. }
  48. // TestAndSet command
  49. type TestAndSetCommand struct {
  50. Key string `json:"key"`
  51. Value string `json:"value"`
  52. PrevValue string `json: prevValue`
  53. ExpireTime time.Time `json:"expireTime"`
  54. }
  55. // The name of the testAndSet command in the log
  56. func (c *TestAndSetCommand) CommandName() string {
  57. return commandName("testAndSet")
  58. }
  59. // Set the key-value pair if the current value of the key equals to the given prevValue
  60. func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
  61. return etcdStore.TestAndSet(c.Key, c.PrevValue, c.Value, c.ExpireTime, server.CommitIndex())
  62. }
  63. // Get command
  64. type GetCommand struct {
  65. Key string `json:"key"`
  66. }
  67. // The name of the get command in the log
  68. func (c *GetCommand) CommandName() string {
  69. return commandName("get")
  70. }
  71. // Get the value of key
  72. func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
  73. return etcdStore.Get(c.Key)
  74. }
  75. // Delete command
  76. type DeleteCommand struct {
  77. Key string `json:"key"`
  78. }
  79. // The name of the delete command in the log
  80. func (c *DeleteCommand) CommandName() string {
  81. return commandName("delete")
  82. }
  83. // Delete the key
  84. func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
  85. return etcdStore.Delete(c.Key, server.CommitIndex())
  86. }
  87. // Watch command
  88. type WatchCommand struct {
  89. Key string `json:"key"`
  90. SinceIndex uint64 `json:"sinceIndex"`
  91. }
  92. // The name of the watch command in the log
  93. func (c *WatchCommand) CommandName() string {
  94. return commandName("watch")
  95. }
  96. func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
  97. // create a new watcher
  98. watcher := store.NewWatcher()
  99. // add to the watchers list
  100. etcdStore.AddWatcher(c.Key, watcher, c.SinceIndex)
  101. // wait for the notification for any changing
  102. res := <-watcher.C
  103. if res == nil {
  104. return nil, fmt.Errorf("Clearing watch")
  105. }
  106. return json.Marshal(res)
  107. }
  108. // JoinCommand
  109. type JoinCommand struct {
  110. RaftVersion string `json:"raftVersion"`
  111. Name string `json:"name"`
  112. RaftURL string `json:"raftURL"`
  113. EtcdURL string `json:"etcdURL"`
  114. }
  115. func newJoinCommand() *JoinCommand {
  116. return &JoinCommand{
  117. RaftVersion: r.version,
  118. Name: r.name,
  119. RaftURL: r.url,
  120. EtcdURL: e.url,
  121. }
  122. }
  123. // The name of the join command in the log
  124. func (c *JoinCommand) CommandName() string {
  125. return commandName("join")
  126. }
  127. // Join a server to the cluster
  128. func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
  129. // check if the join command is from a previous machine, who lost all its previous log.
  130. response, _ := etcdStore.RawGet(path.Join("_etcd/machines", c.Name))
  131. b := make([]byte, 8)
  132. binary.PutUvarint(b, raftServer.CommitIndex())
  133. if response != nil {
  134. return b, nil
  135. }
  136. // check machine number in the cluster
  137. num := machineNum()
  138. if num == maxClusterSize {
  139. debug("Reject join request from ", c.Name)
  140. return []byte{0}, etcdErr.NewError(103, "")
  141. }
  142. addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL)
  143. // add peer in raft
  144. err := raftServer.AddPeer(c.Name, "")
  145. // add machine in etcd storage
  146. key := path.Join("_etcd/machines", c.Name)
  147. value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
  148. etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
  149. // add peer stats
  150. if c.Name != r.Name() {
  151. r.followersStats.Followers[c.Name] = &raftFollowerStats{}
  152. r.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
  153. }
  154. return b, err
  155. }
  156. func (c *JoinCommand) NodeName() string {
  157. return c.Name
  158. }
  159. // RemoveCommand
  160. type RemoveCommand struct {
  161. Name string `json:"name"`
  162. }
  163. // The name of the remove command in the log
  164. func (c *RemoveCommand) CommandName() string {
  165. return commandName("remove")
  166. }
  167. // Remove a server from the cluster
  168. func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
  169. // remove machine in etcd storage
  170. key := path.Join("_etcd/machines", c.Name)
  171. _, err := etcdStore.Delete(key, raftServer.CommitIndex())
  172. // delete from stats
  173. delete(r.followersStats.Followers, c.Name)
  174. if err != nil {
  175. return []byte{0}, err
  176. }
  177. // remove peer in raft
  178. err = raftServer.RemovePeer(c.Name)
  179. if err != nil {
  180. return []byte{0}, err
  181. }
  182. if c.Name == raftServer.Name() {
  183. // the removed node is this node
  184. // if the node is not replaying the previous logs
  185. // and the node has sent out a join request in this
  186. // start. It is sure that this node received a new remove
  187. // command and need to be removed
  188. if raftServer.CommitIndex() > r.joinIndex && r.joinIndex != 0 {
  189. debugf("server [%s] is removed", raftServer.Name())
  190. os.Exit(0)
  191. } else {
  192. // else ignore remove
  193. debugf("ignore previous remove command.")
  194. }
  195. }
  196. b := make([]byte, 8)
  197. binary.PutUvarint(b, raftServer.CommitIndex())
  198. return b, err
  199. }