etcd.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package etcdmain
  15. import (
  16. "fmt"
  17. "log"
  18. "net"
  19. "net/http"
  20. "os"
  21. "strings"
  22. "time"
  23. "github.com/coreos/etcd/discovery"
  24. "github.com/coreos/etcd/etcdserver"
  25. "github.com/coreos/etcd/etcdserver/etcdhttp"
  26. "github.com/coreos/etcd/pkg/cors"
  27. "github.com/coreos/etcd/pkg/fileutil"
  28. "github.com/coreos/etcd/pkg/transport"
  29. "github.com/coreos/etcd/pkg/types"
  30. "github.com/coreos/etcd/proxy"
  31. "github.com/coreos/etcd/rafthttp"
  32. )
  33. const (
  34. // the owner can make/remove files inside the directory
  35. privateDirMode = 0700
  36. )
  37. func Main() {
  38. cfg := NewConfig()
  39. err := cfg.Parse(os.Args[1:])
  40. if err != nil {
  41. log.Printf("etcd: error verifying flags, %v", err)
  42. os.Exit(2)
  43. }
  44. var stopped <-chan struct{}
  45. shouldProxy := cfg.isProxy()
  46. if !shouldProxy {
  47. stopped, err = startEtcd(cfg)
  48. if err == discovery.ErrFullCluster && cfg.shouldFallbackToProxy() {
  49. log.Printf("etcd: discovery cluster full, falling back to %s", fallbackFlagProxy)
  50. shouldProxy = true
  51. }
  52. }
  53. if shouldProxy {
  54. err = startProxy(cfg)
  55. }
  56. if err != nil {
  57. switch err {
  58. case discovery.ErrDuplicateID:
  59. log.Fatalf("etcd: member %s has previously registered with discovery service (%s), but the data-dir (%s) on disk cannot be found.",
  60. cfg.name, cfg.durl, cfg.dir)
  61. default:
  62. log.Fatalf("etcd: %v", err)
  63. }
  64. }
  65. <-stopped
  66. }
  67. // startEtcd launches the etcd server and HTTP handlers for client/server communication.
  68. func startEtcd(cfg *config) (<-chan struct{}, error) {
  69. cls, err := setupCluster(cfg)
  70. if err != nil {
  71. return nil, fmt.Errorf("error setting up initial cluster: %v", err)
  72. }
  73. if cfg.dir == "" {
  74. cfg.dir = fmt.Sprintf("%v.etcd", cfg.name)
  75. log.Printf("no data-dir provided, using default data-dir ./%s", cfg.dir)
  76. }
  77. if err := os.MkdirAll(cfg.dir, privateDirMode); err != nil {
  78. return nil, fmt.Errorf("cannot create data directory: %v", err)
  79. }
  80. if err := fileutil.IsDirWriteable(cfg.dir); err != nil {
  81. return nil, fmt.Errorf("cannot write to data directory: %v", err)
  82. }
  83. pt, err := transport.NewTimeoutTransport(cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
  84. if err != nil {
  85. return nil, err
  86. }
  87. if !cfg.peerTLSInfo.Empty() {
  88. log.Printf("etcd: peerTLS: %s", cfg.peerTLSInfo)
  89. }
  90. plns := make([]net.Listener, 0)
  91. for _, u := range cfg.lpurls {
  92. var l net.Listener
  93. l, err = transport.NewTimeoutListener(u.Host, u.Scheme, cfg.peerTLSInfo, rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout)
  94. if err != nil {
  95. return nil, err
  96. }
  97. urlStr := u.String()
  98. log.Print("etcd: listening for peers on ", urlStr)
  99. defer func() {
  100. if err != nil {
  101. l.Close()
  102. log.Print("etcd: stopping listening for peers on ", urlStr)
  103. }
  104. }()
  105. plns = append(plns, l)
  106. }
  107. if !cfg.clientTLSInfo.Empty() {
  108. log.Printf("etcd: clientTLS: %s", cfg.clientTLSInfo)
  109. }
  110. clns := make([]net.Listener, 0)
  111. for _, u := range cfg.lcurls {
  112. var l net.Listener
  113. l, err = transport.NewKeepAliveListener(u.Host, u.Scheme, cfg.clientTLSInfo)
  114. if err != nil {
  115. return nil, err
  116. }
  117. urlStr := u.String()
  118. log.Print("etcd: listening for client requests on ", urlStr)
  119. defer func() {
  120. if err != nil {
  121. l.Close()
  122. log.Print("etcd: stopping listening for client requests on ", urlStr)
  123. }
  124. }()
  125. clns = append(clns, l)
  126. }
  127. srvcfg := &etcdserver.ServerConfig{
  128. Name: cfg.name,
  129. ClientURLs: cfg.acurls,
  130. PeerURLs: cfg.apurls,
  131. DataDir: cfg.dir,
  132. SnapCount: cfg.snapCount,
  133. MaxSnapFiles: cfg.maxSnapFiles,
  134. MaxWALFiles: cfg.maxWalFiles,
  135. Cluster: cls,
  136. DiscoveryURL: cfg.durl,
  137. DiscoveryProxy: cfg.dproxy,
  138. NewCluster: cfg.isNewCluster(),
  139. ForceNewCluster: cfg.forceNewCluster,
  140. Transport: pt,
  141. TickMs: cfg.TickMs,
  142. ElectionTicks: cfg.electionTicks(),
  143. }
  144. var s *etcdserver.EtcdServer
  145. s, err = etcdserver.NewServer(srvcfg)
  146. if err != nil {
  147. return nil, err
  148. }
  149. s.Start()
  150. if cfg.corsInfo.String() != "" {
  151. log.Printf("etcd: cors = %s", cfg.corsInfo)
  152. }
  153. ch := &cors.CORSHandler{
  154. Handler: etcdhttp.NewClientHandler(s),
  155. Info: cfg.corsInfo,
  156. }
  157. ph := etcdhttp.NewPeerHandler(s.Cluster, s.RaftHandler())
  158. // Start the peer server in a goroutine
  159. for _, l := range plns {
  160. go func(l net.Listener) {
  161. log.Fatal(serveHTTP(l, ph, 5*time.Minute))
  162. }(l)
  163. }
  164. // Start a client server goroutine for each listen address
  165. for _, l := range clns {
  166. go func(l net.Listener) {
  167. // read timeout does not work with http close notify
  168. // TODO: https://github.com/golang/go/issues/9524
  169. log.Fatal(serveHTTP(l, ch, 0))
  170. }(l)
  171. }
  172. return s.StopNotify(), nil
  173. }
  174. // startProxy launches an HTTP proxy for client communication which proxies to other etcd nodes.
  175. func startProxy(cfg *config) error {
  176. cls, err := setupCluster(cfg)
  177. if err != nil {
  178. return fmt.Errorf("error setting up initial cluster: %v", err)
  179. }
  180. if cfg.durl != "" {
  181. s, err := discovery.GetCluster(cfg.durl, cfg.dproxy)
  182. if err != nil {
  183. return err
  184. }
  185. if cls, err = etcdserver.NewClusterFromString(cfg.durl, s); err != nil {
  186. return err
  187. }
  188. }
  189. pt, err := transport.NewTransport(cfg.clientTLSInfo)
  190. if err != nil {
  191. return err
  192. }
  193. tr, err := transport.NewTransport(cfg.peerTLSInfo)
  194. if err != nil {
  195. return err
  196. }
  197. // TODO(jonboulle): update peerURLs dynamically (i.e. when updating
  198. // clientURLs) instead of just using the initial fixed list here
  199. peerURLs := cls.PeerURLs()
  200. uf := func() []string {
  201. cls, err := etcdserver.GetClusterFromPeers(peerURLs, tr)
  202. if err != nil {
  203. log.Printf("proxy: %v", err)
  204. return []string{}
  205. }
  206. return cls.ClientURLs()
  207. }
  208. ph := proxy.NewHandler(pt, uf)
  209. ph = &cors.CORSHandler{
  210. Handler: ph,
  211. Info: cfg.corsInfo,
  212. }
  213. if cfg.isReadonlyProxy() {
  214. ph = proxy.NewReadonlyHandler(ph)
  215. }
  216. // Start a proxy server goroutine for each listen address
  217. for _, u := range cfg.lcurls {
  218. l, err := transport.NewListener(u.Host, u.Scheme, cfg.clientTLSInfo)
  219. if err != nil {
  220. return err
  221. }
  222. host := u.Host
  223. go func() {
  224. log.Print("proxy: listening for client requests on ", host)
  225. log.Fatal(http.Serve(l, ph))
  226. }()
  227. }
  228. return nil
  229. }
  230. // setupCluster sets up an initial cluster definition for bootstrap or discovery.
  231. func setupCluster(cfg *config) (*etcdserver.Cluster, error) {
  232. var cls *etcdserver.Cluster
  233. var err error
  234. switch {
  235. case cfg.durl != "":
  236. // If using discovery, generate a temporary cluster based on
  237. // self's advertised peer URLs
  238. clusterStr := genClusterString(cfg.name, cfg.apurls)
  239. cls, err = etcdserver.NewClusterFromString(cfg.durl, clusterStr)
  240. case cfg.dnsCluster != "":
  241. clusterStr, clusterToken, err := discovery.SRVGetCluster(cfg.name, cfg.dnsCluster, cfg.initialClusterToken, cfg.apurls)
  242. if err != nil {
  243. return nil, err
  244. }
  245. cls, err = etcdserver.NewClusterFromString(clusterToken, clusterStr)
  246. default:
  247. // We're statically configured, and cluster has appropriately been set.
  248. cls, err = etcdserver.NewClusterFromString(cfg.initialClusterToken, cfg.initialCluster)
  249. }
  250. return cls, err
  251. }
  252. func genClusterString(name string, urls types.URLs) string {
  253. addrs := make([]string, 0)
  254. for _, u := range urls {
  255. addrs = append(addrs, fmt.Sprintf("%v=%v", name, u.String()))
  256. }
  257. return strings.Join(addrs, ",")
  258. }