main.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. package main
  2. import (
  3. "errors"
  4. "flag"
  5. "fmt"
  6. "log"
  7. "net/http"
  8. "os"
  9. "path"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/coreos/etcd/etcdserver"
  14. "github.com/coreos/etcd/etcdserver/etcdhttp"
  15. "github.com/coreos/etcd/proxy"
  16. "github.com/coreos/etcd/raft"
  17. "github.com/coreos/etcd/store"
  18. "github.com/coreos/etcd/wal"
  19. )
  20. const (
  21. // the owner can make/remove files inside the directory
  22. privateDirMode = 0700
  23. )
  24. var (
  25. fid = flag.String("id", "0x1", "ID of this server")
  26. timeout = flag.Duration("timeout", 10*time.Second, "Request Timeout")
  27. paddr = flag.String("peer-bind-addr", ":7001", "Peer service address (e.g., ':7001')")
  28. dir = flag.String("data-dir", "", "Path to the data directory")
  29. proxyMode = flag.Bool("proxy-mode", false, "Forward HTTP requests to peers, do not participate in raft.")
  30. peers = &etcdhttp.Peers{}
  31. addrs = &Addrs{}
  32. )
  33. func init() {
  34. flag.Var(peers, "peers", "your peers")
  35. flag.Var(addrs, "bind-addr", "List of HTTP service addresses (e.g., '127.0.0.1:4001,10.0.0.1:8080')")
  36. peers.Set("0x1=localhost:8080")
  37. addrs.Set("127.0.0.1:4001")
  38. }
  39. func main() {
  40. flag.Parse()
  41. getFlagsFromEnv()
  42. if *proxyMode {
  43. startProxy()
  44. } else {
  45. startEtcd()
  46. }
  47. // Block indefinitely
  48. <-make(chan struct{})
  49. }
  50. // startEtcd launches the etcd server and HTTP handlers for client/server communication.
  51. func startEtcd() {
  52. id, err := strconv.ParseInt(*fid, 0, 64)
  53. if err != nil {
  54. log.Fatal(err)
  55. }
  56. if id == raft.None {
  57. log.Fatalf("etcd: cannot use None(%d) as etcdserver id", raft.None)
  58. }
  59. if peers.Pick(id) == "" {
  60. log.Fatalf("%#x=<addr> must be specified in peers", id)
  61. }
  62. if *dir == "" {
  63. *dir = fmt.Sprintf("%v_etcd_data", *fid)
  64. log.Printf("main: no data-dir is given, using default data-dir ./%s", *dir)
  65. }
  66. if err := os.MkdirAll(*dir, privateDirMode); err != nil {
  67. log.Fatalf("main: cannot create data directory: %v", err)
  68. }
  69. n, w := startRaft(id, peers.IDs(), path.Join(*dir, "wal"))
  70. s := &etcdserver.EtcdServer{
  71. Store: store.New(),
  72. Node: n,
  73. Save: w.Save,
  74. Send: etcdhttp.Sender(*peers),
  75. Ticker: time.Tick(100 * time.Millisecond),
  76. SyncTicker: time.Tick(500 * time.Millisecond),
  77. }
  78. s.Start()
  79. ch := etcdhttp.NewClientHandler(s, *peers, *timeout)
  80. ph := etcdhttp.NewPeerHandler(s)
  81. // Start the peer server in a goroutine
  82. go func() {
  83. log.Print("Listening for peers on ", *paddr)
  84. log.Fatal(http.ListenAndServe(*paddr, ph))
  85. }()
  86. // Start a client server goroutine for each listen address
  87. for _, addr := range *addrs {
  88. addr := addr
  89. go func() {
  90. log.Print("Listening for client requests on ", addr)
  91. log.Fatal(http.ListenAndServe(addr, ch))
  92. }()
  93. }
  94. }
  95. // startRaft starts a raft node from the given wal dir.
  96. // If the wal dir does not exist, startRaft will start a new raft node.
  97. // If the wal dir exists, startRaft will restart the previous raft node.
  98. // startRaft returns the started raft node and the opened wal.
  99. func startRaft(id int64, peerIDs []int64, waldir string) (raft.Node, *wal.WAL) {
  100. if !wal.Exist(waldir) {
  101. w, err := wal.Create(waldir)
  102. if err != nil {
  103. log.Fatal(err)
  104. }
  105. n := raft.Start(id, peerIDs, 10, 1)
  106. return n, w
  107. }
  108. // restart a node from previous wal
  109. // TODO(xiangli): check snapshot; not open from one
  110. w, err := wal.OpenAtIndex(waldir, 0)
  111. if err != nil {
  112. log.Fatal(err)
  113. }
  114. wid, st, ents, err := w.ReadAll()
  115. // TODO(xiangli): save/recovery nodeID?
  116. if wid != 0 {
  117. log.Fatalf("unexpected nodeid %d: nodeid should always be zero until we save nodeid into wal", wid)
  118. }
  119. if err != nil {
  120. log.Fatal(err)
  121. }
  122. n := raft.Restart(id, peerIDs, 10, 1, st, ents)
  123. return n, w
  124. }
  125. // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
  126. func startProxy() {
  127. ph, err := proxy.NewHandler((*peers).Endpoints())
  128. if err != nil {
  129. log.Fatal(err)
  130. }
  131. // Start a proxy server goroutine for each listen address
  132. for _, addr := range *addrs {
  133. addr := addr
  134. go func() {
  135. log.Print("Listening for client requests on ", addr)
  136. log.Fatal(http.ListenAndServe(addr, ph))
  137. }()
  138. }
  139. }
  140. // Addrs implements the flag.Value interface to allow users to define multiple
  141. // listen addresses on the command-line
  142. type Addrs []string
  143. // Set parses a command line set of listen addresses, formatted like:
  144. // 127.0.0.1:7001,unix:///var/run/etcd.sock,10.1.1.1:8080
  145. func (as *Addrs) Set(s string) error {
  146. // TODO(jonboulle): validate things.
  147. parsed := make([]string, 0)
  148. for _, a := range strings.Split(s, ",") {
  149. parsed = append(parsed, strings.TrimSpace(a))
  150. }
  151. if len(parsed) == 0 {
  152. return errors.New("no valid addresses given!")
  153. }
  154. *as = parsed
  155. return nil
  156. }
  157. func (as *Addrs) String() string {
  158. return strings.Join(*as, ",")
  159. }
  160. // getFlagsFromEnv parses all registered flags in the global flagset,
  161. // and if they are not already set it attempts to set their values from
  162. // environment variables. Environment variables take the name of the flag but
  163. // are UPPERCASE, have the prefix "ETCD_", and any dashes are replaced by
  164. // underscores - for example: some-flag => ETCD_SOME_FLAG
  165. func getFlagsFromEnv() {
  166. alreadySet := make(map[string]bool)
  167. flag.Visit(func(f *flag.Flag) {
  168. alreadySet[f.Name] = true
  169. })
  170. flag.VisitAll(func(f *flag.Flag) {
  171. if !alreadySet[f.Name] {
  172. key := "ETCD_" + strings.ToUpper(strings.Replace(f.Name, "-", "_", -1))
  173. val := os.Getenv(key)
  174. if val != "" {
  175. flag.Set(f.Name, val)
  176. }
  177. }
  178. })
  179. }