join_command.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. package server
  2. import (
  3. "encoding/binary"
  4. "encoding/json"
  5. etcdErr "github.com/coreos/etcd/error"
  6. "github.com/coreos/etcd/log"
  7. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  8. )
  9. func init() {
  10. raft.RegisterCommand(&JoinCommandV1{})
  11. raft.RegisterCommand(&JoinCommandV2{})
  12. }
  13. // JoinCommandV1 represents a request to join the cluster.
  14. // The command returns the join_index (Uvarint).
  15. type JoinCommandV1 struct {
  16. MinVersion int `json:"minVersion"`
  17. MaxVersion int `json:"maxVersion"`
  18. Name string `json:"name"`
  19. RaftURL string `json:"raftURL"`
  20. EtcdURL string `json:"etcdURL"`
  21. }
  22. // The name of the join command in the log
  23. func (c *JoinCommandV1) CommandName() string {
  24. return "etcd:join"
  25. }
  26. func (c *JoinCommandV1) updatePeerURL(ps *PeerServer) error {
  27. log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL)
  28. if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil {
  29. log.Debugf("Error while updating in registry: %s (%v)", c.Name, err)
  30. return err
  31. }
  32. // Flush commit index, so raft will replay to here when restarted
  33. ps.raftServer.FlushCommitIndex()
  34. return nil
  35. }
  36. // Join a server to the cluster
  37. func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
  38. ps, _ := context.Server().Context().(*PeerServer)
  39. b := make([]byte, 8)
  40. binary.PutUvarint(b, context.CommitIndex())
  41. // Make sure we're not getting a cached value from the registry.
  42. ps.registry.Invalidate(c.Name)
  43. // Check if the join command is from a previous peer, who lost all its previous log.
  44. if peerURL, ok := ps.registry.PeerURL(c.Name); ok {
  45. // If previous node restarts with different peer URL,
  46. // update its information.
  47. if peerURL != c.RaftURL {
  48. log.Infof("Rejoin with %v instead of %v from %v", c.RaftURL, peerURL, c.Name)
  49. if err := c.updatePeerURL(ps); err != nil {
  50. return []byte{0}, err
  51. }
  52. }
  53. return b, nil
  54. }
  55. // Check peer number in the cluster
  56. if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
  57. log.Debug("Reject join request from ", c.Name)
  58. return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex())
  59. }
  60. // Add to shared peer registry.
  61. ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL)
  62. // Add peer in raft
  63. err := context.Server().AddPeer(c.Name, "")
  64. // Add peer stats
  65. if c.Name != ps.RaftServer().Name() {
  66. ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
  67. ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
  68. }
  69. return b, err
  70. }
  71. func (c *JoinCommandV1) NodeName() string {
  72. return c.Name
  73. }
  74. // JoinCommandV2 represents a request to join the cluster.
  75. type JoinCommandV2 struct {
  76. MinVersion int `json:"minVersion"`
  77. MaxVersion int `json:"maxVersion"`
  78. Name string `json:"name"`
  79. PeerURL string `json:"peerURL"`
  80. ClientURL string `json:"clientURL"`
  81. }
  82. // CommandName returns the name of the command in the Raft log.
  83. func (c *JoinCommandV2) CommandName() string {
  84. return "etcd:v2:join"
  85. }
  86. func (c *JoinCommandV2) updatePeerURL(ps *PeerServer) error {
  87. log.Debugf("Update peer URL of %v to %v", c.Name, c.PeerURL)
  88. if err := ps.registry.UpdatePeerURL(c.Name, c.PeerURL); err != nil {
  89. log.Debugf("Error while updating in registry: %s (%v)", c.Name, err)
  90. return err
  91. }
  92. // Flush commit index, so raft will replay to here when restart
  93. ps.raftServer.FlushCommitIndex()
  94. return nil
  95. }
  96. // Apply attempts to join a machine to the cluster.
  97. func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) {
  98. ps, _ := context.Server().Context().(*PeerServer)
  99. var msg = joinMessageV2{
  100. Mode: PeerMode,
  101. CommitIndex: context.CommitIndex(),
  102. }
  103. // Make sure we're not getting a cached value from the registry.
  104. ps.registry.Invalidate(c.Name)
  105. // Check if the join command is from a previous peer, who lost all its previous log.
  106. if peerURL, ok := ps.registry.PeerURL(c.Name); ok {
  107. // If previous node restarts with different peer URL,
  108. // update its information.
  109. if peerURL != c.PeerURL {
  110. log.Infof("Rejoin with %v instead of %v from %v", c.PeerURL, peerURL, c.Name)
  111. if err := c.updatePeerURL(ps); err != nil {
  112. return []byte{0}, err
  113. }
  114. }
  115. return json.Marshal(msg)
  116. }
  117. // Check peer number in the cluster.
  118. if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
  119. log.Debug("Join as standby ", c.Name)
  120. ps.registry.RegisterStandby(c.Name, c.PeerURL, c.ClientURL)
  121. msg.Mode = StandbyMode
  122. return json.Marshal(msg)
  123. }
  124. // Remove it as a standby if it is one.
  125. if ps.registry.StandbyExists(c.Name) {
  126. ps.registry.UnregisterStandby(c.Name)
  127. }
  128. // Add to shared peer registry.
  129. ps.registry.RegisterPeer(c.Name, c.PeerURL, c.ClientURL)
  130. // Add peer in raft
  131. if err := context.Server().AddPeer(c.Name, ""); err != nil {
  132. b, _ := json.Marshal(msg)
  133. return b, err
  134. }
  135. // Add peer stats
  136. if c.Name != ps.RaftServer().Name() {
  137. ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
  138. ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
  139. }
  140. return json.Marshal(msg)
  141. }
  142. func (c *JoinCommandV2) NodeName() string {
  143. return c.Name
  144. }
  145. type joinMessageV2 struct {
  146. CommitIndex uint64 `json:"commitIndex"`
  147. Mode Mode `json:"mode"`
  148. }