etcd.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639
  1. package main
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "crypto/x509"
  6. "encoding/json"
  7. "encoding/pem"
  8. "flag"
  9. "fmt"
  10. "github.com/coreos/etcd/store"
  11. "github.com/coreos/etcd/web"
  12. "github.com/coreos/go-raft"
  13. "io/ioutil"
  14. "net"
  15. "net/http"
  16. "net/url"
  17. "os"
  18. "os/signal"
  19. "runtime/pprof"
  20. "strings"
  21. "time"
  22. )
  23. //------------------------------------------------------------------------------
  24. //
  25. // Initialization
  26. //
  27. //------------------------------------------------------------------------------
  28. var verbose bool
  29. var veryVerbose bool
  30. var machines string
  31. var machinesFile string
  32. var cluster []string
  33. var argInfo Info
  34. var dirPath string
  35. var force bool
  36. var maxSize int
  37. var snapshot bool
  38. var retryTimes int
  39. var maxClusterSize int
  40. var cpuprofile string
  41. func init() {
  42. flag.BoolVar(&verbose, "v", false, "verbose logging")
  43. flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
  44. flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
  45. flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
  46. flag.StringVar(&argInfo.Name, "n", "default-name", "the node name (required)")
  47. flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the hostname:port for etcd client communication")
  48. flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the hostname:port for raft server communication")
  49. flag.StringVar(&argInfo.WebURL, "w", "", "the hostname:port of web interface")
  50. flag.StringVar(&argInfo.RaftTLS.CAFile, "serverCAFile", "", "the path of the CAFile")
  51. flag.StringVar(&argInfo.RaftTLS.CertFile, "serverCert", "", "the cert file of the server")
  52. flag.StringVar(&argInfo.RaftTLS.KeyFile, "serverKey", "", "the key file of the server")
  53. flag.StringVar(&argInfo.EtcdTLS.CAFile, "clientCAFile", "", "the path of the client CAFile")
  54. flag.StringVar(&argInfo.EtcdTLS.CertFile, "clientCert", "", "the cert file of the client")
  55. flag.StringVar(&argInfo.EtcdTLS.KeyFile, "clientKey", "", "the key file of the client")
  56. flag.StringVar(&dirPath, "d", ".", "the directory to store log and snapshot")
  57. flag.BoolVar(&force, "f", false, "force new node configuration if existing is found (WARNING: data loss!)")
  58. flag.BoolVar(&snapshot, "snapshot", false, "open or close snapshot")
  59. flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer")
  60. flag.IntVar(&retryTimes, "r", 3, "the max retry attempts when trying to join a cluster")
  61. flag.IntVar(&maxClusterSize, "maxsize", 9, "the max size of the cluster")
  62. flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file")
  63. }
  64. const (
  65. ELECTIONTIMEOUT = 200 * time.Millisecond
  66. HEARTBEATTIMEOUT = 50 * time.Millisecond
  67. // Timeout for internal raft http connection
  68. // The original timeout for http is 45 seconds
  69. // which is too long for our usage.
  70. HTTPTIMEOUT = 10 * time.Second
  71. RETRYINTERVAL = 10
  72. )
  73. //------------------------------------------------------------------------------
  74. //
  75. // Typedefs
  76. //
  77. //------------------------------------------------------------------------------
  78. type TLSInfo struct {
  79. CertFile string `json:"CertFile"`
  80. KeyFile string `json:"KeyFile"`
  81. CAFile string `json:"CAFile"`
  82. }
  83. type Info struct {
  84. Name string `json:"name"`
  85. RaftURL string `json:"raftURL"`
  86. EtcdURL string `json:"etcdURL"`
  87. WebURL string `json:"webURL"`
  88. RaftTLS TLSInfo `json:"raftTLS"`
  89. EtcdTLS TLSInfo `json:"etcdTLS"`
  90. }
  91. //------------------------------------------------------------------------------
  92. //
  93. // Variables
  94. //
  95. //------------------------------------------------------------------------------
  96. var raftServer *raft.Server
  97. var raftTransporter transporter
  98. var etcdStore *store.Store
  99. var info *Info
  100. //------------------------------------------------------------------------------
  101. //
  102. // Functions
  103. //
  104. //------------------------------------------------------------------------------
  105. // sanitizeURL will cleanup a host string in the format hostname:port and
  106. // attach a schema.
  107. func sanitizeURL(host string, defaultScheme string) string {
  108. // Blank URLs are fine input, just return it
  109. if len(host) == 0 {
  110. return host
  111. }
  112. p, err := url.Parse(host)
  113. if err != nil {
  114. fatal(err)
  115. }
  116. // Make sure the host is in Host:Port format
  117. _, _, err = net.SplitHostPort(host)
  118. if err != nil {
  119. fatal(err)
  120. }
  121. p = &url.URL{Host: host, Scheme: defaultScheme}
  122. return p.String()
  123. }
  124. //--------------------------------------
  125. // Main
  126. //--------------------------------------
  127. func main() {
  128. flag.Parse()
  129. if cpuprofile != "" {
  130. f, err := os.Create(cpuprofile)
  131. if err != nil {
  132. fatal(err)
  133. }
  134. pprof.StartCPUProfile(f)
  135. defer pprof.StopCPUProfile()
  136. c := make(chan os.Signal, 1)
  137. signal.Notify(c, os.Interrupt)
  138. go func() {
  139. for sig := range c {
  140. fmt.Printf("captured %v, stopping profiler and exiting..", sig)
  141. pprof.StopCPUProfile()
  142. os.Exit(1)
  143. }
  144. }()
  145. }
  146. if veryVerbose {
  147. verbose = true
  148. raft.SetLogLevel(raft.Debug)
  149. }
  150. if machines != "" {
  151. cluster = strings.Split(machines, ",")
  152. } else if machinesFile != "" {
  153. b, err := ioutil.ReadFile(machinesFile)
  154. if err != nil {
  155. fatalf("Unable to read the given machines file: %s", err)
  156. }
  157. cluster = strings.Split(string(b), ",")
  158. }
  159. raftTLSConfig, ok := tlsConfigFromInfo(argInfo.RaftTLS)
  160. if !ok {
  161. fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
  162. }
  163. etcdTLSConfig, ok := tlsConfigFromInfo(argInfo.EtcdTLS)
  164. if !ok {
  165. fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
  166. }
  167. argInfo.Name = strings.TrimSpace(argInfo.Name)
  168. if argInfo.Name == "" {
  169. fatal("ERROR: server name required. e.g. '-n=server_name'")
  170. }
  171. argInfo.RaftURL = sanitizeURL(argInfo.RaftURL, raftTLSConfig.Scheme)
  172. argInfo.EtcdURL = sanitizeURL(argInfo.EtcdURL, etcdTLSConfig.Scheme)
  173. argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
  174. // Setup commands.
  175. registerCommands()
  176. // Read server info from file or grab it from user.
  177. if err := os.MkdirAll(dirPath, 0744); err != nil {
  178. fatalf("Unable to create path: %s", err)
  179. }
  180. info = getInfo(dirPath)
  181. // Create etcd key-value store
  182. etcdStore = store.CreateStore(maxSize)
  183. startRaft(raftTLSConfig)
  184. if argInfo.WebURL != "" {
  185. // start web
  186. argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
  187. go webHelper()
  188. go web.Start(raftServer, argInfo.WebURL)
  189. }
  190. startEtcdTransport(*info, etcdTLSConfig.Scheme, etcdTLSConfig.Server)
  191. }
  192. // Start the raft server
  193. func startRaft(tlsConfig TLSConfig) {
  194. var err error
  195. raftName := info.Name
  196. // Create transporter for raft
  197. raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client)
  198. // Create raft server
  199. raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
  200. if err != nil {
  201. fatal(err)
  202. }
  203. // LoadSnapshot
  204. if snapshot {
  205. err = raftServer.LoadSnapshot()
  206. if err == nil {
  207. debugf("%s finished load snapshot", raftServer.Name())
  208. } else {
  209. debug(err)
  210. }
  211. }
  212. raftServer.SetElectionTimeout(ELECTIONTIMEOUT)
  213. raftServer.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
  214. raftServer.Start()
  215. if raftServer.IsLogEmpty() {
  216. // start as a leader in a new cluster
  217. if len(cluster) == 0 {
  218. time.Sleep(time.Millisecond * 20)
  219. // leader need to join self as a peer
  220. for {
  221. command := &JoinCommand{
  222. Name: raftServer.Name(),
  223. RaftURL: argInfo.RaftURL,
  224. EtcdURL: argInfo.EtcdURL,
  225. }
  226. _, err := raftServer.Do(command)
  227. if err == nil {
  228. break
  229. }
  230. }
  231. debugf("%s start as a leader", raftServer.Name())
  232. // start as a follower in a existing cluster
  233. } else {
  234. time.Sleep(time.Millisecond * 20)
  235. for i := 0; i < retryTimes; i++ {
  236. success := false
  237. for _, machine := range cluster {
  238. if len(machine) == 0 {
  239. continue
  240. }
  241. err = joinCluster(raftServer, machine)
  242. if err != nil {
  243. if err.Error() == errors[103] {
  244. fmt.Println(err)
  245. os.Exit(1)
  246. }
  247. debugf("cannot join to cluster via machine %s %s", machine, err)
  248. } else {
  249. success = true
  250. break
  251. }
  252. }
  253. if success {
  254. break
  255. }
  256. warnf("cannot join to cluster via given machines, retry in %d seconds", RETRYINTERVAL)
  257. time.Sleep(time.Second * RETRYINTERVAL)
  258. }
  259. if err != nil {
  260. fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
  261. }
  262. debugf("%s success join to the cluster", raftServer.Name())
  263. }
  264. } else {
  265. // rejoin the previous cluster
  266. debugf("%s restart as a follower", raftServer.Name())
  267. }
  268. // open the snapshot
  269. if snapshot {
  270. go raftServer.Snapshot()
  271. }
  272. // start to response to raft requests
  273. go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
  274. }
  275. // Create transporter using by raft server
  276. // Create http or https transporter based on
  277. // whether the user give the server cert and key
  278. func newTransporter(scheme string, tlsConf tls.Config) transporter {
  279. t := transporter{}
  280. t.scheme = scheme
  281. tr := &http.Transport{
  282. Dial: dialTimeout,
  283. }
  284. if scheme == "https" {
  285. tr.TLSClientConfig = &tlsConf
  286. tr.DisableCompression = true
  287. }
  288. t.client = &http.Client{Transport: tr}
  289. return t
  290. }
  291. // Dial with timeout
  292. func dialTimeout(network, addr string) (net.Conn, error) {
  293. return net.DialTimeout(network, addr, HTTPTIMEOUT)
  294. }
  295. // Start to listen and response raft command
  296. func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
  297. u, _ := url.Parse(info.RaftURL)
  298. fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
  299. raftMux := http.NewServeMux()
  300. server := &http.Server{
  301. Handler: raftMux,
  302. TLSConfig: &tlsConf,
  303. Addr: u.Host,
  304. }
  305. // internal commands
  306. raftMux.HandleFunc("/name", NameHttpHandler)
  307. raftMux.HandleFunc("/join", JoinHttpHandler)
  308. raftMux.HandleFunc("/vote", VoteHttpHandler)
  309. raftMux.HandleFunc("/log", GetLogHttpHandler)
  310. raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
  311. raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
  312. raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
  313. raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
  314. if scheme == "http" {
  315. fatal(server.ListenAndServe())
  316. } else {
  317. fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
  318. }
  319. }
  320. // Start to listen and response client command
  321. func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) {
  322. u, _ := url.Parse(info.EtcdURL)
  323. fmt.Printf("etcd server [%s] listening on %s\n", info.Name, u)
  324. etcdMux := http.NewServeMux()
  325. server := &http.Server{
  326. Handler: etcdMux,
  327. TLSConfig: &tlsConf,
  328. Addr: u.Host,
  329. }
  330. // external commands
  331. etcdMux.HandleFunc("/"+version+"/keys/", Multiplexer)
  332. etcdMux.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
  333. etcdMux.HandleFunc("/leader", LeaderHttpHandler)
  334. etcdMux.HandleFunc("/machines", MachinesHttpHandler)
  335. etcdMux.HandleFunc("/", VersionHttpHandler)
  336. etcdMux.HandleFunc("/stats", StatsHttpHandler)
  337. etcdMux.HandleFunc("/test/", TestHttpHandler)
  338. if scheme == "http" {
  339. fatal(server.ListenAndServe())
  340. } else {
  341. fatal(server.ListenAndServeTLS(info.EtcdTLS.CertFile, info.EtcdTLS.KeyFile))
  342. }
  343. }
  344. //--------------------------------------
  345. // Config
  346. //--------------------------------------
  347. type TLSConfig struct {
  348. Scheme string
  349. Server tls.Config
  350. Client tls.Config
  351. }
  352. func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
  353. var keyFile, certFile, CAFile string
  354. var tlsCert tls.Certificate
  355. var err error
  356. t.Scheme = "http"
  357. keyFile = info.KeyFile
  358. certFile = info.CertFile
  359. CAFile = info.CAFile
  360. // If the user do not specify key file, cert file and
  361. // CA file, the type will be HTTP
  362. if keyFile == "" && certFile == "" && CAFile == "" {
  363. return t, true
  364. }
  365. // both the key and cert must be present
  366. if keyFile == "" || certFile == "" {
  367. return t, false
  368. }
  369. tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
  370. if err != nil {
  371. fatal(err)
  372. }
  373. t.Scheme = "https"
  374. t.Server.ClientAuth, t.Server.ClientCAs = newCertPool(CAFile)
  375. // The client should trust the RootCA that the Server uses since
  376. // everyone is a peer in the network.
  377. t.Client.Certificates = []tls.Certificate{tlsCert}
  378. t.Client.RootCAs = t.Server.ClientCAs
  379. return t, true
  380. }
  381. func parseInfo(path string) *Info {
  382. file, err := os.Open(path)
  383. if err != nil {
  384. return nil
  385. }
  386. info := &Info{}
  387. defer file.Close()
  388. content, err := ioutil.ReadAll(file)
  389. if err != nil {
  390. fatalf("Unable to read info: %v", err)
  391. return nil
  392. }
  393. if err = json.Unmarshal(content, &info); err != nil {
  394. fatalf("Unable to parse info: %v", err)
  395. return nil
  396. }
  397. return info
  398. }
  399. // Get the server info from previous conf file
  400. // or from the user
  401. func getInfo(path string) *Info {
  402. // Read in the server info if available.
  403. infoPath := fmt.Sprintf("%s/info", path)
  404. // Delete the old configuration if exist
  405. if force {
  406. logPath := fmt.Sprintf("%s/log", path)
  407. confPath := fmt.Sprintf("%s/conf", path)
  408. snapshotPath := fmt.Sprintf("%s/snapshot", path)
  409. os.Remove(infoPath)
  410. os.Remove(logPath)
  411. os.Remove(confPath)
  412. os.RemoveAll(snapshotPath)
  413. }
  414. info := parseInfo(infoPath)
  415. if info != nil {
  416. fmt.Printf("Found node configuration in '%s'. Ignoring flags.\n", infoPath)
  417. return info
  418. }
  419. info = &argInfo
  420. // Write to file.
  421. content, _ := json.MarshalIndent(info, "", " ")
  422. content = []byte(string(content) + "\n")
  423. if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
  424. fatalf("Unable to write info to file: %v", err)
  425. }
  426. fmt.Printf("Wrote node configuration to '%s'.\n", infoPath)
  427. return info
  428. }
  429. // Create client auth certpool
  430. func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
  431. if CAFile == "" {
  432. return tls.NoClientCert, nil
  433. }
  434. pemByte, _ := ioutil.ReadFile(CAFile)
  435. block, pemByte := pem.Decode(pemByte)
  436. cert, err := x509.ParseCertificate(block.Bytes)
  437. if err != nil {
  438. fatal(err)
  439. }
  440. certPool := x509.NewCertPool()
  441. certPool.AddCert(cert)
  442. return tls.RequireAndVerifyClientCert, certPool
  443. }
  444. // Send join requests to the leader.
  445. func joinCluster(s *raft.Server, raftURL string) error {
  446. var b bytes.Buffer
  447. command := &JoinCommand{
  448. Name: s.Name(),
  449. RaftURL: info.RaftURL,
  450. EtcdURL: info.EtcdURL,
  451. }
  452. json.NewEncoder(&b).Encode(command)
  453. // t must be ok
  454. t, ok := raftServer.Transporter().(transporter)
  455. if !ok {
  456. panic("wrong type")
  457. }
  458. joinURL := url.URL{Host: raftURL, Scheme: raftTransporter.scheme, Path: "/join"}
  459. debugf("Send Join Request to %s", raftURL)
  460. resp, err := t.Post(joinURL.String(), &b)
  461. for {
  462. if err != nil {
  463. return fmt.Errorf("Unable to join: %v", err)
  464. }
  465. if resp != nil {
  466. defer resp.Body.Close()
  467. if resp.StatusCode == http.StatusOK {
  468. return nil
  469. }
  470. if resp.StatusCode == http.StatusTemporaryRedirect {
  471. address := resp.Header.Get("Location")
  472. debugf("Send Join Request to %s", address)
  473. json.NewEncoder(&b).Encode(command)
  474. resp, err = t.Post(address, &b)
  475. } else if resp.StatusCode == http.StatusBadRequest {
  476. debug("Reach max number machines in the cluster")
  477. return fmt.Errorf(errors[103])
  478. } else {
  479. return fmt.Errorf("Unable to join")
  480. }
  481. }
  482. }
  483. return fmt.Errorf("Unable to join: %v", err)
  484. }
  485. // Register commands to raft server
  486. func registerCommands() {
  487. raft.RegisterCommand(&JoinCommand{})
  488. raft.RegisterCommand(&SetCommand{})
  489. raft.RegisterCommand(&GetCommand{})
  490. raft.RegisterCommand(&DeleteCommand{})
  491. raft.RegisterCommand(&WatchCommand{})
  492. raft.RegisterCommand(&TestAndSetCommand{})
  493. }