join_command.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package server
  2. import (
  3. "encoding/binary"
  4. "encoding/json"
  5. etcdErr "github.com/coreos/etcd/error"
  6. "github.com/coreos/etcd/log"
  7. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  8. )
  9. func init() {
  10. raft.RegisterCommand(&JoinCommandV1{})
  11. raft.RegisterCommand(&JoinCommandV2{})
  12. }
  13. // JoinCommandV1 represents a request to join the cluster.
  14. // The command returns the join_index (Uvarint).
  15. type JoinCommandV1 struct {
  16. MinVersion int `json:"minVersion"`
  17. MaxVersion int `json:"maxVersion"`
  18. Name string `json:"name"`
  19. RaftURL string `json:"raftURL"`
  20. EtcdURL string `json:"etcdURL"`
  21. }
  22. // The name of the join command in the log
  23. func (c *JoinCommandV1) CommandName() string {
  24. return "etcd:join"
  25. }
  26. func (c *JoinCommandV1) updatePeerURL(ps *PeerServer) error {
  27. log.Debugf("Update peer URL of %v to %v", c.Name, c.RaftURL)
  28. if err := ps.registry.UpdatePeerURL(c.Name, c.RaftURL); err != nil {
  29. log.Debugf("Error while updating in registry: %s (%v)", c.Name, err)
  30. return err
  31. }
  32. // Flush commit index, so raft will replay to here when restarted
  33. ps.raftServer.FlushCommitIndex()
  34. return nil
  35. }
  36. // Join a server to the cluster
  37. func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
  38. ps, _ := context.Server().Context().(*PeerServer)
  39. b := make([]byte, 8)
  40. binary.PutUvarint(b, context.CommitIndex())
  41. // Make sure we're not getting a cached value from the registry.
  42. ps.registry.Invalidate(c.Name)
  43. // Check if the join command is from a previous peer, who lost all its previous log.
  44. if peerURL, ok := ps.registry.PeerURL(c.Name); ok {
  45. // If previous node restarts with different peer URL,
  46. // update its information.
  47. if peerURL != c.RaftURL {
  48. log.Infof("Rejoin with %v instead of %v from %v", c.RaftURL, peerURL, c.Name)
  49. if err := c.updatePeerURL(ps); err != nil {
  50. return []byte{0}, err
  51. }
  52. }
  53. return b, nil
  54. }
  55. // Check if the join command adds an instance that collides with existing one on peer URL.
  56. peerURLs := ps.registry.PeerURLs(ps.raftServer.Leader(), c.Name)
  57. for _, peerURL := range peerURLs {
  58. if peerURL == c.RaftURL {
  59. log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.EtcdURL)
  60. return []byte{0}, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.EtcdURL, context.CommitIndex())
  61. }
  62. }
  63. // Check peer number in the cluster
  64. if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
  65. log.Debug("Reject join request from ", c.Name)
  66. return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex())
  67. }
  68. // Add to shared peer registry.
  69. ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL)
  70. // Add peer in raft
  71. err := context.Server().AddPeer(c.Name, "")
  72. // Add peer stats
  73. if c.Name != ps.RaftServer().Name() {
  74. ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
  75. ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
  76. }
  77. return b, err
  78. }
  79. func (c *JoinCommandV1) NodeName() string {
  80. return c.Name
  81. }
  82. // JoinCommandV2 represents a request to join the cluster.
  83. type JoinCommandV2 struct {
  84. MinVersion int `json:"minVersion"`
  85. MaxVersion int `json:"maxVersion"`
  86. Name string `json:"name"`
  87. PeerURL string `json:"peerURL"`
  88. ClientURL string `json:"clientURL"`
  89. }
  90. // CommandName returns the name of the command in the Raft log.
  91. func (c *JoinCommandV2) CommandName() string {
  92. return "etcd:v2:join"
  93. }
  94. func (c *JoinCommandV2) updatePeerURL(ps *PeerServer) error {
  95. log.Debugf("Update peer URL of %v to %v", c.Name, c.PeerURL)
  96. if err := ps.registry.UpdatePeerURL(c.Name, c.PeerURL); err != nil {
  97. log.Debugf("Error while updating in registry: %s (%v)", c.Name, err)
  98. return err
  99. }
  100. // Flush commit index, so raft will replay to here when restart
  101. ps.raftServer.FlushCommitIndex()
  102. return nil
  103. }
  104. // Apply attempts to join a machine to the cluster.
  105. func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) {
  106. ps, _ := context.Server().Context().(*PeerServer)
  107. var msg = joinMessageV2{
  108. Mode: PeerMode,
  109. CommitIndex: context.CommitIndex(),
  110. }
  111. // Make sure we're not getting a cached value from the registry.
  112. ps.registry.Invalidate(c.Name)
  113. // Check if the join command is from a previous peer, who lost all its previous log.
  114. if peerURL, ok := ps.registry.PeerURL(c.Name); ok {
  115. // If previous node restarts with different peer URL,
  116. // update its information.
  117. if peerURL != c.PeerURL {
  118. log.Infof("Rejoin with %v instead of %v from %v", c.PeerURL, peerURL, c.Name)
  119. if err := c.updatePeerURL(ps); err != nil {
  120. return []byte{0}, err
  121. }
  122. }
  123. return json.Marshal(msg)
  124. }
  125. // Check if the join command adds an instance that collides with existing one on peer URL.
  126. peerURLs := ps.registry.PeerURLs(ps.raftServer.Leader(), c.Name)
  127. for _, peerURL := range peerURLs {
  128. if peerURL == c.PeerURL {
  129. log.Warnf("%v tries to join the cluster with existing URL %v", c.Name, c.PeerURL)
  130. return []byte{0}, etcdErr.NewError(etcdErr.EcodeExistingPeerAddr, c.PeerURL, context.CommitIndex())
  131. }
  132. }
  133. // Check peer number in the cluster.
  134. if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
  135. log.Debug("Join as standby ", c.Name)
  136. ps.registry.RegisterStandby(c.Name, c.PeerURL, c.ClientURL)
  137. msg.Mode = StandbyMode
  138. return json.Marshal(msg)
  139. }
  140. // Remove it as a standby if it is one.
  141. if ps.registry.StandbyExists(c.Name) {
  142. ps.registry.UnregisterStandby(c.Name)
  143. }
  144. // Add to shared peer registry.
  145. ps.registry.RegisterPeer(c.Name, c.PeerURL, c.ClientURL)
  146. // Add peer in raft
  147. if err := context.Server().AddPeer(c.Name, ""); err != nil {
  148. b, _ := json.Marshal(msg)
  149. return b, err
  150. }
  151. // Add peer stats
  152. if c.Name != ps.RaftServer().Name() {
  153. ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
  154. ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
  155. }
  156. return json.Marshal(msg)
  157. }
  158. func (c *JoinCommandV2) NodeName() string {
  159. return c.Name
  160. }
  161. type joinMessageV2 struct {
  162. CommitIndex uint64 `json:"commitIndex"`
  163. Mode Mode `json:"mode"`
  164. }