join_command.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package server
  2. import (
  3. "encoding/binary"
  4. "encoding/json"
  5. etcdErr "github.com/coreos/etcd/error"
  6. "github.com/coreos/etcd/log"
  7. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  8. )
  9. func init() {
  10. raft.RegisterCommand(&JoinCommandV1{})
  11. raft.RegisterCommand(&JoinCommandV2{})
  12. }
  13. // JoinCommandV1 represents a request to join the cluster.
  14. // The command returns the join_index (Uvarint).
  15. type JoinCommandV1 struct {
  16. MinVersion int `json:"minVersion"`
  17. MaxVersion int `json:"maxVersion"`
  18. Name string `json:"name"`
  19. RaftURL string `json:"raftURL"`
  20. EtcdURL string `json:"etcdURL"`
  21. }
  22. // The name of the join command in the log
  23. func (c *JoinCommandV1) CommandName() string {
  24. return "etcd:join"
  25. }
  26. // Join a server to the cluster
  27. func (c *JoinCommandV1) Apply(context raft.Context) (interface{}, error) {
  28. ps, _ := context.Server().Context().(*PeerServer)
  29. b := make([]byte, 8)
  30. binary.PutUvarint(b, context.CommitIndex())
  31. // Make sure we're not getting a cached value from the registry.
  32. ps.registry.Invalidate(c.Name)
  33. // Check if the join command is from a previous peer, who lost all its previous log.
  34. if _, ok := ps.registry.ClientURL(c.Name); ok {
  35. return b, nil
  36. }
  37. // Check peer number in the cluster
  38. if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
  39. log.Debug("Reject join request from ", c.Name)
  40. return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMorePeer, "", context.CommitIndex())
  41. }
  42. // Add to shared peer registry.
  43. ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL)
  44. // Add peer in raft
  45. err := context.Server().AddPeer(c.Name, "")
  46. // Add peer stats
  47. if c.Name != ps.RaftServer().Name() {
  48. ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
  49. ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
  50. }
  51. return b, err
  52. }
  53. func (c *JoinCommandV1) NodeName() string {
  54. return c.Name
  55. }
  56. // JoinCommandV2 represents a request to join the cluster.
  57. type JoinCommandV2 struct {
  58. MinVersion int `json:"minVersion"`
  59. MaxVersion int `json:"maxVersion"`
  60. Name string `json:"name"`
  61. PeerURL string `json:"peerURL"`
  62. ClientURL string `json:"clientURL"`
  63. }
  64. // CommandName returns the name of the command in the Raft log.
  65. func (c *JoinCommandV2) CommandName() string {
  66. return "etcd:v2:join"
  67. }
  68. // Apply attempts to join a machine to the cluster.
  69. func (c *JoinCommandV2) Apply(context raft.Context) (interface{}, error) {
  70. ps, _ := context.Server().Context().(*PeerServer)
  71. var msg = joinMessageV2{
  72. Mode: PeerMode,
  73. CommitIndex: context.CommitIndex(),
  74. }
  75. // Make sure we're not getting a cached value from the registry.
  76. ps.registry.Invalidate(c.Name)
  77. // Check if the join command is from a previous peer, who lost all its previous log.
  78. if _, ok := ps.registry.ClientURL(c.Name); ok {
  79. return json.Marshal(msg)
  80. }
  81. // Check peer number in the cluster.
  82. if ps.registry.PeerCount() >= ps.ClusterConfig().ActiveSize {
  83. log.Debug("Join as proxy ", c.Name)
  84. ps.registry.RegisterProxy(c.Name, c.PeerURL, c.ClientURL)
  85. msg.Mode = ProxyMode
  86. return json.Marshal(msg)
  87. }
  88. // Remove it as a proxy if it is one.
  89. if ps.registry.ProxyExists(c.Name) {
  90. ps.registry.UnregisterProxy(c.Name)
  91. }
  92. // Add to shared peer registry.
  93. ps.registry.RegisterPeer(c.Name, c.PeerURL, c.ClientURL)
  94. // Add peer in raft
  95. if err := context.Server().AddPeer(c.Name, ""); err != nil {
  96. b, _ := json.Marshal(msg)
  97. return b, err
  98. }
  99. // Add peer stats
  100. if c.Name != ps.RaftServer().Name() {
  101. ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
  102. ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
  103. }
  104. return json.Marshal(msg)
  105. }
  106. func (c *JoinCommandV2) NodeName() string {
  107. return c.Name
  108. }
  109. type joinMessageV2 struct {
  110. CommitIndex uint64 `json:"commitIndex"`
  111. Mode Mode `json:"mode"`
  112. }