etcd.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. package main
  2. import (
  3. "crypto/tls"
  4. "crypto/x509"
  5. "encoding/pem"
  6. "flag"
  7. "github.com/coreos/etcd/store"
  8. "github.com/coreos/etcd/web"
  9. "io/ioutil"
  10. "net"
  11. "net/http"
  12. "net/url"
  13. "os"
  14. "os/signal"
  15. "runtime/pprof"
  16. "strings"
  17. "time"
  18. )
  19. //------------------------------------------------------------------------------
  20. //
  21. // Initialization
  22. //
  23. //------------------------------------------------------------------------------
  24. var verbose bool
  25. var veryVerbose bool
  26. var machines string
  27. var machinesFile string
  28. var cluster []string
  29. var argInfo Info
  30. var dirPath string
  31. var force bool
  32. var maxSize int
  33. var snapshot bool
  34. var retryTimes int
  35. var maxClusterSize int
  36. var cpuprofile string
  37. func init() {
  38. flag.BoolVar(&verbose, "v", false, "verbose logging")
  39. flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
  40. flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
  41. flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
  42. flag.StringVar(&argInfo.Name, "n", "default-name", "the node name (required)")
  43. flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the hostname:port for etcd client communication")
  44. flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the hostname:port for raft server communication")
  45. flag.StringVar(&argInfo.WebURL, "w", "", "the hostname:port of web interface")
  46. flag.StringVar(&argInfo.RaftTLS.CAFile, "serverCAFile", "", "the path of the CAFile")
  47. flag.StringVar(&argInfo.RaftTLS.CertFile, "serverCert", "", "the cert file of the server")
  48. flag.StringVar(&argInfo.RaftTLS.KeyFile, "serverKey", "", "the key file of the server")
  49. flag.StringVar(&argInfo.EtcdTLS.CAFile, "clientCAFile", "", "the path of the client CAFile")
  50. flag.StringVar(&argInfo.EtcdTLS.CertFile, "clientCert", "", "the cert file of the client")
  51. flag.StringVar(&argInfo.EtcdTLS.KeyFile, "clientKey", "", "the key file of the client")
  52. flag.StringVar(&dirPath, "d", ".", "the directory to store log and snapshot")
  53. flag.BoolVar(&force, "f", false, "force new node configuration if existing is found (WARNING: data loss!)")
  54. flag.BoolVar(&snapshot, "snapshot", false, "open or close snapshot")
  55. flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer")
  56. flag.IntVar(&retryTimes, "r", 3, "the max retry attempts when trying to join a cluster")
  57. flag.IntVar(&maxClusterSize, "maxsize", 9, "the max size of the cluster")
  58. flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file")
  59. }
  60. const (
  61. ElectionTimeout = 200 * time.Millisecond
  62. HeartbeatTimeout = 50 * time.Millisecond
  63. // Timeout for internal raft http connection
  64. // The original timeout for http is 45 seconds
  65. // which is too long for our usage.
  66. HTTPTimeout = 10 * time.Second
  67. RetryInterval = 10
  68. )
  69. //------------------------------------------------------------------------------
  70. //
  71. // Typedefs
  72. //
  73. //------------------------------------------------------------------------------
  74. type TLSInfo struct {
  75. CertFile string `json:"CertFile"`
  76. KeyFile string `json:"KeyFile"`
  77. CAFile string `json:"CAFile"`
  78. }
  79. type Info struct {
  80. Name string `json:"name"`
  81. RaftURL string `json:"raftURL"`
  82. EtcdURL string `json:"etcdURL"`
  83. WebURL string `json:"webURL"`
  84. RaftTLS TLSInfo `json:"raftTLS"`
  85. EtcdTLS TLSInfo `json:"etcdTLS"`
  86. }
  87. type TLSConfig struct {
  88. Scheme string
  89. Server tls.Config
  90. Client tls.Config
  91. }
  92. //------------------------------------------------------------------------------
  93. //
  94. // Variables
  95. //
  96. //------------------------------------------------------------------------------
  97. var etcdStore *store.Store
  98. var info *Info
  99. //------------------------------------------------------------------------------
  100. //
  101. // Functions
  102. //
  103. //------------------------------------------------------------------------------
  104. //--------------------------------------
  105. // Main
  106. //--------------------------------------
  107. func main() {
  108. flag.Parse()
  109. if cpuprofile != "" {
  110. f, err := os.Create(cpuprofile)
  111. if err != nil {
  112. fatal(err)
  113. }
  114. pprof.StartCPUProfile(f)
  115. defer pprof.StopCPUProfile()
  116. c := make(chan os.Signal, 1)
  117. signal.Notify(c, os.Interrupt)
  118. go func() {
  119. for sig := range c {
  120. infof("captured %v, stopping profiler and exiting..", sig)
  121. pprof.StopCPUProfile()
  122. os.Exit(1)
  123. }
  124. }()
  125. }
  126. if veryVerbose {
  127. verbose = true
  128. }
  129. if machines != "" {
  130. cluster = strings.Split(machines, ",")
  131. } else if machinesFile != "" {
  132. b, err := ioutil.ReadFile(machinesFile)
  133. if err != nil {
  134. fatalf("Unable to read the given machines file: %s", err)
  135. }
  136. cluster = strings.Split(string(b), ",")
  137. }
  138. raftTLSConfig, ok := tlsConfigFromInfo(argInfo.RaftTLS)
  139. if !ok {
  140. fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
  141. }
  142. etcdTLSConfig, ok := tlsConfigFromInfo(argInfo.EtcdTLS)
  143. if !ok {
  144. fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
  145. }
  146. argInfo.Name = strings.TrimSpace(argInfo.Name)
  147. if argInfo.Name == "" {
  148. fatal("ERROR: server name required. e.g. '-n=server_name'")
  149. }
  150. argInfo.RaftURL = sanitizeURL(argInfo.RaftURL, raftTLSConfig.Scheme)
  151. argInfo.EtcdURL = sanitizeURL(argInfo.EtcdURL, etcdTLSConfig.Scheme)
  152. argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
  153. // Setup commands.
  154. registerCommands()
  155. // Read server info from file or grab it from user.
  156. if err := os.MkdirAll(dirPath, 0744); err != nil {
  157. fatalf("Unable to create path: %s", err)
  158. }
  159. info = getInfo(dirPath)
  160. // Create etcd key-value store
  161. etcdStore = store.CreateStore(maxSize)
  162. snapConf = newSnapshotConf()
  163. startRaft(raftTLSConfig)
  164. if argInfo.WebURL != "" {
  165. // start web
  166. argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
  167. go webHelper()
  168. go web.Start(raftServer, argInfo.WebURL)
  169. }
  170. startEtcdTransport(*info, etcdTLSConfig.Scheme, etcdTLSConfig.Server)
  171. }
  172. // Create transporter using by raft server
  173. // Create http or https transporter based on
  174. // whether the user give the server cert and key
  175. func newTransporter(scheme string, tlsConf tls.Config) transporter {
  176. t := transporter{}
  177. tr := &http.Transport{
  178. Dial: dialTimeout,
  179. }
  180. if scheme == "https" {
  181. tr.TLSClientConfig = &tlsConf
  182. tr.DisableCompression = true
  183. }
  184. t.client = &http.Client{Transport: tr}
  185. return t
  186. }
  187. // Dial with timeout
  188. func dialTimeout(network, addr string) (net.Conn, error) {
  189. return net.DialTimeout(network, addr, HTTPTimeout)
  190. }
  191. type Etcd struct {
  192. http.Server
  193. url string
  194. scheme string
  195. tls TLSConfig
  196. }
  197. // Start to listen and response client command
  198. func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) {
  199. u, err := url.Parse(info.EtcdURL)
  200. if err != nil {
  201. fatalf("invalid url '%s': %s", info.EtcdURL, err)
  202. }
  203. infof("etcd server [%s:%s]", info.Name, u)
  204. etcdMux := http.NewServeMux()
  205. server := &http.Server{
  206. Handler: etcdMux,
  207. TLSConfig: &tlsConf,
  208. Addr: u.Host,
  209. }
  210. // external commands
  211. etcdMux.HandleFunc("/"+version+"/keys/", Multiplexer)
  212. etcdMux.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
  213. etcdMux.HandleFunc("/leader", LeaderHttpHandler)
  214. etcdMux.HandleFunc("/machines", MachinesHttpHandler)
  215. etcdMux.HandleFunc("/", VersionHttpHandler)
  216. etcdMux.HandleFunc("/stats", StatsHttpHandler)
  217. etcdMux.HandleFunc("/test/", TestHttpHandler)
  218. if scheme == "http" {
  219. fatal(server.ListenAndServe())
  220. } else {
  221. fatal(server.ListenAndServeTLS(info.EtcdTLS.CertFile, info.EtcdTLS.KeyFile))
  222. }
  223. }
  224. //--------------------------------------
  225. // Config
  226. //--------------------------------------
  227. func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
  228. var keyFile, certFile, CAFile string
  229. var tlsCert tls.Certificate
  230. var err error
  231. t.Scheme = "http"
  232. keyFile = info.KeyFile
  233. certFile = info.CertFile
  234. CAFile = info.CAFile
  235. // If the user do not specify key file, cert file and
  236. // CA file, the type will be HTTP
  237. if keyFile == "" && certFile == "" && CAFile == "" {
  238. return t, true
  239. }
  240. // both the key and cert must be present
  241. if keyFile == "" || certFile == "" {
  242. return t, false
  243. }
  244. tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
  245. if err != nil {
  246. fatal(err)
  247. }
  248. t.Scheme = "https"
  249. t.Server.ClientAuth, t.Server.ClientCAs = newCertPool(CAFile)
  250. // The client should trust the RootCA that the Server uses since
  251. // everyone is a peer in the network.
  252. t.Client.Certificates = []tls.Certificate{tlsCert}
  253. t.Client.RootCAs = t.Server.ClientCAs
  254. return t, true
  255. }
  256. // newCertPool creates x509 certPool and corresponding Auth Type.
  257. // If the given CAfile is valid, add the cert into the pool and verify the clients'
  258. // certs against the cert in the pool.
  259. // If the given CAfile is empty, do not verify the clients' cert.
  260. // If the given CAfile is not valid, fatal.
  261. func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
  262. if CAFile == "" {
  263. return tls.NoClientCert, nil
  264. }
  265. pemByte, err := ioutil.ReadFile(CAFile)
  266. check(err)
  267. block, pemByte := pem.Decode(pemByte)
  268. cert, err := x509.ParseCertificate(block.Bytes)
  269. check(err)
  270. certPool := x509.NewCertPool()
  271. certPool.AddCert(cert)
  272. return tls.RequireAndVerifyClientCert, certPool
  273. }