remove_command.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package server
  2. import (
  3. "encoding/binary"
  4. "encoding/json"
  5. "os"
  6. "github.com/coreos/etcd/log"
  7. "github.com/coreos/etcd/third_party/github.com/coreos/raft"
  8. )
  9. func init() {
  10. raft.RegisterCommand(&RemoveCommandV1{})
  11. raft.RegisterCommand(&RemoveCommandV2{})
  12. }
  13. // The RemoveCommandV1 removes a server from the cluster.
  14. type RemoveCommandV1 struct {
  15. Name string `json:"name"`
  16. }
  17. // The name of the remove command in the log
  18. func (c *RemoveCommandV1) CommandName() string {
  19. return "etcd:remove"
  20. }
  21. // Remove a server from the cluster
  22. func (c *RemoveCommandV1) Apply(context raft.Context) (interface{}, error) {
  23. ps, _ := context.Server().Context().(*PeerServer)
  24. // If this is a proxy then remove it and exit.
  25. if ps.registry.ProxyExists(c.Name) {
  26. return []byte{0}, ps.registry.UnregisterProxy(c.Name)
  27. }
  28. // Remove node from the shared registry.
  29. err := ps.registry.UnregisterPeer(c.Name)
  30. // Delete from stats
  31. delete(ps.followersStats.Followers, c.Name)
  32. if err != nil {
  33. log.Debugf("Error while unregistering: %s (%v)", c.Name, err)
  34. return []byte{0}, err
  35. }
  36. // Remove peer in raft
  37. err = context.Server().RemovePeer(c.Name)
  38. if err != nil {
  39. log.Debugf("Unable to remove peer: %s (%v)", c.Name, err)
  40. return []byte{0}, err
  41. }
  42. if c.Name == context.Server().Name() {
  43. // the removed node is this node
  44. // if the node is not replaying the previous logs
  45. // and the node has sent out a join request in this
  46. // start. It is sure that this node received a new remove
  47. // command and need to be removed
  48. if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 {
  49. log.Debugf("server [%s] is removed", context.Server().Name())
  50. os.Exit(0)
  51. } else {
  52. // else ignore remove
  53. log.Debugf("ignore previous remove command.")
  54. }
  55. }
  56. b := make([]byte, 8)
  57. binary.PutUvarint(b, context.CommitIndex())
  58. return b, err
  59. }
  60. // RemoveCommandV2 represents a command to remove a machine from the server.
  61. type RemoveCommandV2 struct {
  62. Name string `json:"name"`
  63. }
  64. // CommandName returns the name of the command.
  65. func (c *RemoveCommandV2) CommandName() string {
  66. return "etcd:v2:remove"
  67. }
  68. // Apply removes the given machine from the cluster.
  69. func (c *RemoveCommandV2) Apply(context raft.Context) (interface{}, error) {
  70. ps, _ := context.Server().Context().(*PeerServer)
  71. ret, _ := json.Marshal(removeMessageV2{CommitIndex: context.CommitIndex()})
  72. // If this is a proxy then remove it and exit.
  73. if ps.registry.ProxyExists(c.Name) {
  74. if err := ps.registry.UnregisterProxy(c.Name); err != nil {
  75. return nil, err
  76. }
  77. return ret, nil
  78. }
  79. // Remove node from the shared registry.
  80. err := ps.registry.UnregisterPeer(c.Name)
  81. // Delete from stats
  82. delete(ps.followersStats.Followers, c.Name)
  83. if err != nil {
  84. log.Debugf("Error while unregistering: %s (%v)", c.Name, err)
  85. return nil, err
  86. }
  87. // Remove peer in raft
  88. if err := context.Server().RemovePeer(c.Name); err != nil {
  89. log.Debugf("Unable to remove peer: %s (%v)", c.Name, err)
  90. return nil, err
  91. }
  92. if c.Name == context.Server().Name() {
  93. // the removed node is this node
  94. // if the node is not replaying the previous logs
  95. // and the node has sent out a join request in this
  96. // start. It is sure that this node received a new remove
  97. // command and need to be removed
  98. if context.CommitIndex() > ps.joinIndex && ps.joinIndex != 0 {
  99. log.Debugf("server [%s] is removed", context.Server().Name())
  100. os.Exit(0)
  101. } else {
  102. // else ignore remove
  103. log.Debugf("ignore previous remove command.")
  104. }
  105. }
  106. return ret, nil
  107. }
  108. type removeMessageV2 struct {
  109. CommitIndex uint64 `json:"commitIndex"`
  110. }