etcd.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. package main
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "encoding/json"
  7. "encoding/pem"
  8. "flag"
  9. "fmt"
  10. "github.com/coreos/etcd/store"
  11. "github.com/coreos/etcd/web"
  12. "github.com/coreos/go-raft"
  13. "io/ioutil"
  14. "net"
  15. "net/http"
  16. "net/url"
  17. "os"
  18. "os/signal"
  19. "runtime/pprof"
  20. "strings"
  21. "time"
  22. )
  23. //------------------------------------------------------------------------------
  24. //
  25. // Initialization
  26. //
  27. //------------------------------------------------------------------------------
  28. var verbose bool
  29. var veryVerbose bool
  30. var machines string
  31. var machinesFile string
  32. var cluster []string
  33. var argInfo Info
  34. var dirPath string
  35. var force bool
  36. var maxSize int
  37. var snapshot bool
  38. var retryTimes int
  39. var maxClusterSize int
  40. var cpuprofile string
  41. func init() {
  42. flag.BoolVar(&verbose, "v", false, "verbose logging")
  43. flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
  44. flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
  45. flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
  46. flag.StringVar(&argInfo.Name, "n", "", "the node name (required)")
  47. flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the hostname:port for etcd client communication")
  48. flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the hostname:port for raft server communication")
  49. flag.StringVar(&argInfo.WebURL, "w", "", "the hostname:port of web interface")
  50. flag.StringVar(&argInfo.RaftTLS.CAFile, "serverCAFile", "", "the path of the CAFile")
  51. flag.StringVar(&argInfo.RaftTLS.CertFile, "serverCert", "", "the cert file of the server")
  52. flag.StringVar(&argInfo.RaftTLS.KeyFile, "serverKey", "", "the key file of the server")
  53. flag.StringVar(&argInfo.EtcdTLS.CAFile, "clientCAFile", "", "the path of the client CAFile")
  54. flag.StringVar(&argInfo.EtcdTLS.CertFile, "clientCert", "", "the cert file of the client")
  55. flag.StringVar(&argInfo.EtcdTLS.KeyFile, "clientKey", "", "the key file of the client")
  56. flag.StringVar(&dirPath, "d", ".", "the directory to store log and snapshot")
  57. flag.BoolVar(&force, "f", false, "force new node configuration if existing is found (WARNING: data loss!)")
  58. flag.BoolVar(&snapshot, "snapshot", false, "open or close snapshot")
  59. flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer")
  60. flag.IntVar(&retryTimes, "r", 3, "the max retry attempts when trying to join a cluster")
  61. flag.IntVar(&maxClusterSize, "maxsize", 9, "the max size of the cluster")
  62. flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file")
  63. }
  64. const (
  65. ElectionTimeout = 200 * time.Millisecond
  66. HeartbeatTimeout = 50 * time.Millisecond
  67. // Timeout for internal raft http connection
  68. // The original timeout for http is 45 seconds
  69. // which is too long for our usage.
  70. HTTPTimeout = 10 * time.Second
  71. RetryInterval = 10
  72. )
  73. //------------------------------------------------------------------------------
  74. //
  75. // Typedefs
  76. //
  77. //------------------------------------------------------------------------------
  78. type TLSInfo struct {
  79. CertFile string `json:"CertFile"`
  80. KeyFile string `json:"KeyFile"`
  81. CAFile string `json:"CAFile"`
  82. }
  83. type Info struct {
  84. Name string `json:"name"`
  85. RaftURL string `json:"raftURL"`
  86. EtcdURL string `json:"etcdURL"`
  87. WebURL string `json:"webURL"`
  88. RaftTLS TLSInfo `json:"raftTLS"`
  89. EtcdTLS TLSInfo `json:"etcdTLS"`
  90. }
  91. type TLSConfig struct {
  92. Scheme string
  93. Server tls.Config
  94. Client tls.Config
  95. }
  96. //------------------------------------------------------------------------------
  97. //
  98. // Variables
  99. //
  100. //------------------------------------------------------------------------------
  101. var raftServer *raft.Server
  102. var raftTransporter transporter
  103. var etcdStore *store.Store
  104. var info *Info
  105. //------------------------------------------------------------------------------
  106. //
  107. // Functions
  108. //
  109. //------------------------------------------------------------------------------
  110. // sanitizeURL will cleanup a host string in the format hostname:port and
  111. // attach a schema.
  112. func sanitizeURL(host string, defaultScheme string) string {
  113. // Blank URLs are fine input, just return it
  114. if len(host) == 0 {
  115. return host
  116. }
  117. p, err := url.Parse(host)
  118. if err != nil {
  119. fatal(err)
  120. }
  121. // Make sure the host is in Host:Port format
  122. _, _, err = net.SplitHostPort(host)
  123. if err != nil {
  124. fatal(err)
  125. }
  126. p = &url.URL{Host: host, Scheme: defaultScheme}
  127. return p.String()
  128. }
  129. //--------------------------------------
  130. // Main
  131. //--------------------------------------
  132. func main() {
  133. flag.Parse()
  134. if cpuprofile != "" {
  135. f, err := os.Create(cpuprofile)
  136. if err != nil {
  137. fatal(err)
  138. }
  139. pprof.StartCPUProfile(f)
  140. defer pprof.StopCPUProfile()
  141. c := make(chan os.Signal, 1)
  142. signal.Notify(c, os.Interrupt)
  143. go func() {
  144. for sig := range c {
  145. fmt.Printf("captured %v, stopping profiler and exiting..", sig)
  146. pprof.StopCPUProfile()
  147. os.Exit(1)
  148. }
  149. }()
  150. }
  151. if veryVerbose {
  152. verbose = true
  153. raft.SetLogLevel(raft.Debug)
  154. }
  155. if machines != "" {
  156. cluster = strings.Split(machines, ",")
  157. } else if machinesFile != "" {
  158. b, err := ioutil.ReadFile(machinesFile)
  159. if err != nil {
  160. fatalf("Unable to read the given machines file: %s", err)
  161. }
  162. cluster = strings.Split(string(b), ",")
  163. }
  164. raftTLSConfig, ok := tlsConfigFromInfo(argInfo.RaftTLS)
  165. if !ok {
  166. fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
  167. }
  168. etcdTLSConfig, ok := tlsConfigFromInfo(argInfo.EtcdTLS)
  169. if !ok {
  170. fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
  171. }
  172. argInfo.Name = strings.TrimSpace(argInfo.Name)
  173. if argInfo.Name == "" {
  174. fatal("ERROR: server name required. e.g. '-n=server_name'")
  175. }
  176. argInfo.RaftURL = sanitizeURL(argInfo.RaftURL, raftTLSConfig.Scheme)
  177. argInfo.EtcdURL = sanitizeURL(argInfo.EtcdURL, etcdTLSConfig.Scheme)
  178. argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
  179. // Setup commands.
  180. registerCommands()
  181. // Read server info from file or grab it from user.
  182. if err := os.MkdirAll(dirPath, 0744); err != nil {
  183. fatalf("Unable to create path: %s", err)
  184. }
  185. info = getInfo(dirPath)
  186. // Create etcd key-value store
  187. etcdStore = store.CreateStore(maxSize)
  188. snapConf = newSnapshotConf()
  189. startRaft(raftTLSConfig)
  190. if argInfo.WebURL != "" {
  191. // start web
  192. argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
  193. go webHelper()
  194. go web.Start(raftServer, argInfo.WebURL)
  195. }
  196. startEtcdTransport(*info, etcdTLSConfig.Scheme, etcdTLSConfig.Server)
  197. }
  198. // Start the raft server
  199. func startRaft(tlsConfig TLSConfig) {
  200. var err error
  201. raftName := info.Name
  202. // Create transporter for raft
  203. raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client)
  204. // Create raft server
  205. raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
  206. if err != nil {
  207. fatal(err)
  208. }
  209. // LoadSnapshot
  210. if snapshot {
  211. err = raftServer.LoadSnapshot()
  212. if err == nil {
  213. debugf("%s finished load snapshot", raftServer.Name())
  214. } else {
  215. debug(err)
  216. }
  217. }
  218. raftServer.SetElectionTimeout(ElectionTimeout)
  219. raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
  220. raftServer.Start()
  221. if raftServer.IsLogEmpty() {
  222. // start as a leader in a new cluster
  223. if len(cluster) == 0 {
  224. time.Sleep(time.Millisecond * 20)
  225. // leader need to join self as a peer
  226. for {
  227. command := &JoinCommand{
  228. Name: raftServer.Name(),
  229. RaftURL: argInfo.RaftURL,
  230. EtcdURL: argInfo.EtcdURL,
  231. }
  232. _, err := raftServer.Do(command)
  233. if err == nil {
  234. break
  235. }
  236. }
  237. debugf("%s start as a leader", raftServer.Name())
  238. // start as a follower in a existing cluster
  239. } else {
  240. time.Sleep(time.Millisecond * 20)
  241. for i := 0; i < retryTimes; i++ {
  242. success := false
  243. for _, machine := range cluster {
  244. if len(machine) == 0 {
  245. continue
  246. }
  247. err = joinCluster(raftServer, machine)
  248. if err != nil {
  249. if err.Error() == errors[103] {
  250. fmt.Println(err)
  251. os.Exit(1)
  252. }
  253. debugf("cannot join to cluster via machine %s %s", machine, err)
  254. } else {
  255. success = true
  256. break
  257. }
  258. }
  259. if success {
  260. break
  261. }
  262. warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
  263. time.Sleep(time.Second * RetryInterval)
  264. }
  265. if err != nil {
  266. fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
  267. }
  268. debugf("%s success join to the cluster", raftServer.Name())
  269. }
  270. } else {
  271. // rejoin the previous cluster
  272. debugf("%s restart as a follower", raftServer.Name())
  273. }
  274. // open the snapshot
  275. if snapshot {
  276. go monitorSnapshot()
  277. }
  278. // start to response to raft requests
  279. go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
  280. }
  281. // Create transporter using by raft server
  282. // Create http or https transporter based on
  283. // whether the user give the server cert and key
  284. func newTransporter(scheme string, tlsConf tls.Config) transporter {
  285. t := transporter{}
  286. t.scheme = scheme
  287. tr := &http.Transport{
  288. Dial: dialTimeout,
  289. }
  290. if scheme == "https" {
  291. tr.TLSClientConfig = &tlsConf
  292. tr.DisableCompression = true
  293. }
  294. t.client = &http.Client{Transport: tr}
  295. return t
  296. }
  297. // Dial with timeout
  298. func dialTimeout(network, addr string) (net.Conn, error) {
  299. return net.DialTimeout(network, addr, HTTPTimeout)
  300. }
  301. // Start to listen and response raft command
  302. func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
  303. u, _ := url.Parse(info.RaftURL)
  304. fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
  305. raftMux := http.NewServeMux()
  306. server := &http.Server{
  307. Handler: raftMux,
  308. TLSConfig: &tlsConf,
  309. Addr: u.Host,
  310. }
  311. // internal commands
  312. raftMux.HandleFunc("/name", NameHttpHandler)
  313. raftMux.HandleFunc("/join", JoinHttpHandler)
  314. raftMux.HandleFunc("/vote", VoteHttpHandler)
  315. raftMux.HandleFunc("/log", GetLogHttpHandler)
  316. raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
  317. raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
  318. raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
  319. raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
  320. if scheme == "http" {
  321. fatal(server.ListenAndServe())
  322. } else {
  323. fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
  324. }
  325. }
  326. // Start to listen and response client command
  327. func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) {
  328. u, _ := url.Parse(info.EtcdURL)
  329. fmt.Printf("etcd server [%s] listening on %s\n", info.Name, u)
  330. etcdMux := http.NewServeMux()
  331. server := &http.Server{
  332. Handler: etcdMux,
  333. TLSConfig: &tlsConf,
  334. Addr: u.Host,
  335. }
  336. // external commands
  337. etcdMux.HandleFunc("/"+version+"/keys/", Multiplexer)
  338. etcdMux.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
  339. etcdMux.HandleFunc("/leader", LeaderHttpHandler)
  340. etcdMux.HandleFunc("/machines", MachinesHttpHandler)
  341. etcdMux.HandleFunc("/", VersionHttpHandler)
  342. etcdMux.HandleFunc("/stats", StatsHttpHandler)
  343. etcdMux.HandleFunc("/test/", TestHttpHandler)
  344. if scheme == "http" {
  345. fatal(server.ListenAndServe())
  346. } else {
  347. fatal(server.ListenAndServeTLS(info.EtcdTLS.CertFile, info.EtcdTLS.KeyFile))
  348. }
  349. }
  350. //--------------------------------------
  351. // Config
  352. //--------------------------------------
  353. func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
  354. var keyFile, certFile, CAFile string
  355. var tlsCert tls.Certificate
  356. var err error
  357. t.Scheme = "http"
  358. keyFile = info.KeyFile
  359. certFile = info.CertFile
  360. CAFile = info.CAFile
  361. // If the user do not specify key file, cert file and
  362. // CA file, the type will be HTTP
  363. if keyFile == "" && certFile == "" && CAFile == "" {
  364. return t, true
  365. }
  366. // both the key and cert must be present
  367. if keyFile == "" || certFile == "" {
  368. return t, false
  369. }
  370. tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
  371. if err != nil {
  372. fatal(err)
  373. }
  374. t.Scheme = "https"
  375. t.Server.ClientAuth, t.Server.ClientCAs = newCertPool(CAFile)
  376. t.Client.Certificates = []tls.Certificate{tlsCert}
  377. t.Client.InsecureSkipVerify = true
  378. return t, true
  379. }
  380. func parseInfo(path string) *Info {
  381. file, err := os.Open(path)
  382. if err != nil {
  383. return nil
  384. }
  385. info := &Info{}
  386. defer file.Close()
  387. content, err := ioutil.ReadAll(file)
  388. if err != nil {
  389. fatalf("Unable to read info: %v", err)
  390. return nil
  391. }
  392. if err = json.Unmarshal(content, &info); err != nil {
  393. fatalf("Unable to parse info: %v", err)
  394. return nil
  395. }
  396. return info
  397. }
  398. // Get the server info from previous conf file
  399. // or from the user
  400. func getInfo(path string) *Info {
  401. // Read in the server info if available.
  402. infoPath := fmt.Sprintf("%s/info", path)
  403. // Delete the old configuration if exist
  404. if force {
  405. logPath := fmt.Sprintf("%s/log", path)
  406. confPath := fmt.Sprintf("%s/conf", path)
  407. snapshotPath := fmt.Sprintf("%s/snapshot", path)
  408. os.Remove(infoPath)
  409. os.Remove(logPath)
  410. os.Remove(confPath)
  411. os.RemoveAll(snapshotPath)
  412. }
  413. info := parseInfo(infoPath)
  414. if info != nil {
  415. fmt.Printf("Found node configuration in '%s'. Ignoring flags.\n", infoPath)
  416. return info
  417. }
  418. info = &argInfo
  419. // Write to file.
  420. content, _ := json.MarshalIndent(info, "", " ")
  421. content = []byte(string(content) + "\n")
  422. if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
  423. fatalf("Unable to write info to file: %v", err)
  424. }
  425. fmt.Printf("Wrote node configuration to '%s'.\n", infoPath)
  426. return info
  427. }
  428. // newCertPool creates x509 certPool and corresponding Auth Type.
  429. // If the given CAfile is valid, add the cert into the pool and verify the clients'
  430. // certs against the cert in the pool.
  431. // If the given CAfile is empty, do not verify the clients' cert.
  432. // If the given CAfile is not valid, fatal.
  433. func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
  434. if CAFile == "" {
  435. return tls.NoClientCert, nil
  436. }
  437. pemByte, _ := ioutil.ReadFile(CAFile)
  438. block, pemByte := pem.Decode(pemByte)
  439. cert, err := x509.ParseCertificate(block.Bytes)
  440. if err != nil {
  441. fatal(err)
  442. }
  443. certPool := x509.NewCertPool()
  444. certPool.AddCert(cert)
  445. return tls.RequireAndVerifyClientCert, certPool
  446. }
  447. // Send join requests to the leader.
  448. func joinCluster(s *raft.Server, raftURL string) error {
  449. var b bytes.Buffer
  450. command := &JoinCommand{
  451. Name: s.Name(),
  452. RaftURL: info.RaftURL,
  453. EtcdURL: info.EtcdURL,
  454. }
  455. json.NewEncoder(&b).Encode(command)
  456. // t must be ok
  457. t, ok := raftServer.Transporter().(transporter)
  458. if !ok {
  459. panic("wrong type")
  460. }
  461. joinURL := url.URL{Host: raftURL, Scheme: raftTransporter.scheme, Path: "/join"}
  462. debugf("Send Join Request to %s", raftURL)
  463. resp, err := t.Post(joinURL.String(), &b)
  464. for {
  465. if err != nil {
  466. return fmt.Errorf("Unable to join: %v", err)
  467. }
  468. if resp != nil {
  469. defer resp.Body.Close()
  470. if resp.StatusCode == http.StatusOK {
  471. return nil
  472. }
  473. if resp.StatusCode == http.StatusTemporaryRedirect {
  474. address := resp.Header.Get("Location")
  475. debugf("Send Join Request to %s", address)
  476. json.NewEncoder(&b).Encode(command)
  477. resp, err = t.Post(address, &b)
  478. } else if resp.StatusCode == http.StatusBadRequest {
  479. debug("Reach max number machines in the cluster")
  480. return fmt.Errorf(errors[103])
  481. } else {
  482. return fmt.Errorf("Unable to join")
  483. }
  484. }
  485. }
  486. return fmt.Errorf("Unable to join: %v", err)
  487. }
  488. // Register commands to raft server
  489. func registerCommands() {
  490. raft.RegisterCommand(&JoinCommand{})
  491. raft.RegisterCommand(&SetCommand{})
  492. raft.RegisterCommand(&GetCommand{})
  493. raft.RegisterCommand(&DeleteCommand{})
  494. raft.RegisterCommand(&WatchCommand{})
  495. raft.RegisterCommand(&TestAndSetCommand{})
  496. }