join_command.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. package server
  2. import (
  3. "encoding/binary"
  4. etcdErr "github.com/coreos/etcd/error"
  5. "github.com/coreos/etcd/log"
  6. "github.com/coreos/etcd/store"
  7. "github.com/coreos/go-raft"
  8. )
  9. func init() {
  10. raft.RegisterCommand(&JoinCommand{})
  11. }
  12. // The JoinCommand adds a node to the cluster.
  13. type JoinCommand struct {
  14. RaftVersion string `json:"raftVersion"`
  15. Name string `json:"name"`
  16. RaftURL string `json:"raftURL"`
  17. EtcdURL string `json:"etcdURL"`
  18. MaxClusterSize int `json:"maxClusterSize"`
  19. }
  20. func NewJoinCommand(version, name, raftUrl, etcdUrl string, maxClusterSize int) *JoinCommand {
  21. return &JoinCommand{
  22. RaftVersion: version,
  23. Name: name,
  24. RaftURL: raftUrl,
  25. EtcdURL: etcdUrl,
  26. MaxClusterSize: maxClusterSize,
  27. }
  28. }
  29. // The name of the join command in the log
  30. func (c *JoinCommand) CommandName() string {
  31. return "etcd:join"
  32. }
  33. // Join a server to the cluster
  34. func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
  35. s, _ := server.StateMachine().(*store.Store)
  36. ps, _ := server.Context().(*PeerServer)
  37. b := make([]byte, 8)
  38. binary.PutUvarint(b, server.CommitIndex())
  39. // Check if the join command is from a previous machine, who lost all its previous log.
  40. if _, ok := ps.registry.URL(c.Name); ok {
  41. return b, nil
  42. }
  43. // Check machine number in the cluster
  44. if ps.registry.Count() == c.MaxClusterSize {
  45. log.Debug("Reject join request from ", c.Name)
  46. return []byte{0}, etcdErr.NewError(etcdErr.EcodeNoMoreMachine, "", server.CommitIndex(), server.Term())
  47. }
  48. // Add to shared machine registry.
  49. ps.registry.Register(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL, server.CommitIndex(), server.Term())
  50. // Add peer in raft
  51. err := server.AddPeer(c.Name, "")
  52. // Add peer stats
  53. if c.Name != ps.Name() {
  54. ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
  55. ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
  56. }
  57. return b, err
  58. }
  59. func (c *JoinCommand) NodeName() string {
  60. return c.Name
  61. }