starter.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package starter
  15. import (
  16. "encoding/json"
  17. "flag"
  18. "fmt"
  19. "io/ioutil"
  20. "log"
  21. "net/http"
  22. "net/url"
  23. "os"
  24. "path"
  25. "strings"
  26. "syscall"
  27. "github.com/coreos/etcd/client"
  28. "github.com/coreos/etcd/etcdmain"
  29. "github.com/coreos/etcd/migrate"
  30. "github.com/coreos/etcd/pkg/fileutil"
  31. "github.com/coreos/etcd/pkg/flags"
  32. "github.com/coreos/etcd/pkg/osutil"
  33. "github.com/coreos/etcd/pkg/types"
  34. etcdversion "github.com/coreos/etcd/version"
  35. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  36. )
  37. type version string
  38. const (
  39. internalV1 version = "1"
  40. internalV2 version = "2"
  41. internalV2Proxy version = "2.proxy"
  42. internalUnknown version = "unknown"
  43. v0_4 version = "v0.4"
  44. v2_0 version = "v2.0"
  45. v2_0Proxy version = "v2.0 proxy"
  46. empty version = "empty"
  47. unknown version = "unknown"
  48. defaultInternalV1etcdBinaryDir = "/usr/libexec/etcd/internal_versions/"
  49. )
  50. var (
  51. v2SpecialFlags = []string{
  52. "initial-cluster",
  53. "listen-peer-urls",
  54. "listen-client-urls",
  55. "proxy",
  56. }
  57. )
  58. func StartDesiredVersion(args []string) {
  59. fs, err := parseConfig(args)
  60. if err != nil {
  61. return
  62. }
  63. if fs.Lookup("version").Value.String() == "true" {
  64. fmt.Println("etcd version", etcdversion.Version)
  65. os.Exit(0)
  66. }
  67. ver := checkInternalVersion(fs)
  68. log.Printf("starter: start etcd version %s", ver)
  69. switch ver {
  70. case internalV1:
  71. startInternalV1()
  72. case internalV2:
  73. case internalV2Proxy:
  74. if _, err := os.Stat(standbyInfo4(fs.Lookup("data-dir").Value.String())); err != nil {
  75. log.Printf("starter: Detect standby_info file exists, and add --proxy=on flag to ensure it runs in v2.0 proxy mode.")
  76. log.Printf("starter: Before removing v0.4 data, --proxy=on flag MUST be added.")
  77. }
  78. // append proxy flag to args to trigger proxy mode
  79. os.Args = append(os.Args, "-proxy=on")
  80. default:
  81. log.Panicf("starter: unhandled start version")
  82. }
  83. }
  84. func checkInternalVersion(fs *flag.FlagSet) version {
  85. // If it uses 2.0 env var explicitly, start 2.0
  86. for _, name := range v2SpecialFlags {
  87. if fs.Lookup(name).Value.String() != "" {
  88. return internalV2
  89. }
  90. }
  91. dataDir := fs.Lookup("data-dir").Value.String()
  92. if dataDir == "" {
  93. log.Fatalf("starter: please set --data-dir or ETCD_DATA_DIR for etcd")
  94. }
  95. // check the data directory
  96. dataver, err := checkVersion(dataDir)
  97. if err != nil {
  98. log.Fatalf("starter: failed to detect etcd version in %v: %v", dataDir, err)
  99. }
  100. log.Printf("starter: detect etcd version %s in %s", dataver, dataDir)
  101. switch dataver {
  102. case v2_0:
  103. return internalV2
  104. case v2_0Proxy:
  105. return internalV2Proxy
  106. case v0_4:
  107. standbyInfo, err := migrate.DecodeStandbyInfo4FromFile(standbyInfo4(dataDir))
  108. if err != nil && !os.IsNotExist(err) {
  109. log.Fatalf("starter: failed to decode standbyInfo in %v: %v", dataDir, err)
  110. }
  111. inStandbyMode := standbyInfo != nil && standbyInfo.Running
  112. if inStandbyMode {
  113. ver, err := checkInternalVersionByClientURLs(standbyInfo.ClientURLs(), clientTLSInfo(fs))
  114. if err != nil {
  115. log.Printf("starter: failed to check start version through peers: %v", err)
  116. return internalV1
  117. }
  118. if ver == internalV2 {
  119. osutil.Unsetenv("ETCD_DISCOVERY")
  120. os.Args = append(os.Args, "-initial-cluster", standbyInfo.InitialCluster())
  121. return internalV2Proxy
  122. }
  123. return ver
  124. }
  125. ver, err := checkInternalVersionByDataDir4(dataDir)
  126. if err != nil {
  127. log.Fatalf("starter: failed to check start version in %v: %v", dataDir, err)
  128. }
  129. return ver
  130. case empty:
  131. discovery := fs.Lookup("discovery").Value.String()
  132. dpeers, err := getPeersFromDiscoveryURL(discovery)
  133. if err != nil {
  134. log.Printf("starter: failed to get peers from discovery %s: %v", discovery, err)
  135. }
  136. peerStr := fs.Lookup("peers").Value.String()
  137. ppeers := getPeersFromPeersFlag(peerStr, peerTLSInfo(fs))
  138. urls := getClientURLsByPeerURLs(append(dpeers, ppeers...), peerTLSInfo(fs))
  139. ver, err := checkInternalVersionByClientURLs(urls, clientTLSInfo(fs))
  140. if err != nil {
  141. log.Printf("starter: failed to check start version through peers: %v", err)
  142. return internalV2
  143. }
  144. return ver
  145. }
  146. // never reach here
  147. log.Panicf("starter: unhandled etcd version in %v", dataDir)
  148. return internalUnknown
  149. }
  150. func checkVersion(dataDir string) (version, error) {
  151. names, err := fileutil.ReadDir(dataDir)
  152. if err != nil {
  153. if os.IsNotExist(err) {
  154. err = nil
  155. }
  156. return empty, err
  157. }
  158. if len(names) == 0 {
  159. return empty, nil
  160. }
  161. nameSet := types.NewUnsafeSet(names...)
  162. if nameSet.ContainsAll([]string{"member"}) {
  163. return v2_0, nil
  164. }
  165. if nameSet.ContainsAll([]string{"proxy"}) {
  166. return v2_0Proxy, nil
  167. }
  168. if nameSet.ContainsAll([]string{"snapshot", "conf", "log"}) {
  169. return v0_4, nil
  170. }
  171. if nameSet.ContainsAll([]string{"standby_info"}) {
  172. return v0_4, nil
  173. }
  174. return unknown, fmt.Errorf("failed to check version")
  175. }
  176. func checkInternalVersionByDataDir4(dataDir string) (version, error) {
  177. // check v0.4 snapshot
  178. snap4, err := migrate.DecodeLatestSnapshot4FromDir(snapDir4(dataDir))
  179. if err != nil {
  180. return internalUnknown, err
  181. }
  182. if snap4 != nil {
  183. st := &migrate.Store4{}
  184. if err := json.Unmarshal(snap4.State, st); err != nil {
  185. return internalUnknown, err
  186. }
  187. dir := st.Root.Children["_etcd"]
  188. n, ok := dir.Children["next-internal-version"]
  189. if ok && n.Value == "2" {
  190. return internalV2, nil
  191. }
  192. }
  193. // check v0.4 log
  194. ents4, err := migrate.DecodeLog4FromFile(logFile4(dataDir))
  195. if err != nil {
  196. return internalUnknown, err
  197. }
  198. for _, e := range ents4 {
  199. cmd, err := migrate.NewCommand4(e.GetCommandName(), e.GetCommand(), nil)
  200. if err != nil {
  201. return internalUnknown, err
  202. }
  203. setcmd, ok := cmd.(*migrate.SetCommand)
  204. if !ok {
  205. continue
  206. }
  207. if setcmd.Key == "/_etcd/next-internal-version" && setcmd.Value == "2" {
  208. return internalV2, nil
  209. }
  210. }
  211. return internalV1, nil
  212. }
  213. func getClientURLsByPeerURLs(peers []string, tls *TLSInfo) []string {
  214. c, err := newDefaultClient(tls)
  215. if err != nil {
  216. log.Printf("starter: new client error: %v", err)
  217. return nil
  218. }
  219. var urls []string
  220. for _, u := range peers {
  221. resp, err := c.Get(u + "/etcdURL")
  222. if err != nil {
  223. log.Printf("starter: failed to get /etcdURL from %s", u)
  224. continue
  225. }
  226. b, err := ioutil.ReadAll(resp.Body)
  227. if err != nil {
  228. log.Printf("starter: failed to read body from %s", u)
  229. continue
  230. }
  231. urls = append(urls, string(b))
  232. }
  233. return urls
  234. }
  235. func checkInternalVersionByClientURLs(urls []string, tls *TLSInfo) (version, error) {
  236. c, err := newDefaultClient(tls)
  237. if err != nil {
  238. return internalUnknown, err
  239. }
  240. for _, u := range urls {
  241. resp, err := c.Get(u + "/version")
  242. if err != nil {
  243. log.Printf("starter: failed to get /version from %s", u)
  244. continue
  245. }
  246. b, err := ioutil.ReadAll(resp.Body)
  247. if err != nil {
  248. log.Printf("starter: failed to read body from %s", u)
  249. continue
  250. }
  251. var m map[string]string
  252. err = json.Unmarshal(b, &m)
  253. if err != nil {
  254. log.Printf("starter: failed to unmarshal body %s from %s", b, u)
  255. continue
  256. }
  257. switch m["internalVersion"] {
  258. case "1":
  259. return internalV1, nil
  260. case "2":
  261. return internalV2, nil
  262. default:
  263. log.Printf("starter: unrecognized internal version %s from %s", m["internalVersion"], u)
  264. }
  265. }
  266. return internalUnknown, fmt.Errorf("failed to get version from urls %v", urls)
  267. }
  268. func getPeersFromDiscoveryURL(discoverURL string) ([]string, error) {
  269. if discoverURL == "" {
  270. return nil, nil
  271. }
  272. u, err := url.Parse(discoverURL)
  273. if err != nil {
  274. return nil, err
  275. }
  276. token := u.Path
  277. u.Path = ""
  278. c, err := client.NewHTTPClient(&http.Transport{}, []string{u.String()})
  279. if err != nil {
  280. return nil, err
  281. }
  282. dc := client.NewDiscoveryKeysAPI(c)
  283. ctx, cancel := context.WithTimeout(context.Background(), client.DefaultRequestTimeout)
  284. resp, err := dc.Get(ctx, token)
  285. cancel()
  286. if err != nil {
  287. return nil, err
  288. }
  289. peers := make([]string, 0)
  290. // append non-config keys to peers
  291. for _, n := range resp.Node.Nodes {
  292. if g := path.Base(n.Key); g == "_config" || g == "_state" {
  293. continue
  294. }
  295. peers = append(peers, n.Value)
  296. }
  297. return peers, nil
  298. }
  299. func getPeersFromPeersFlag(str string, tls *TLSInfo) []string {
  300. peers := trimSplit(str, ",")
  301. for i, p := range peers {
  302. peers[i] = tls.Scheme() + "://" + p
  303. }
  304. return peers
  305. }
  306. func startInternalV1() {
  307. p := os.Getenv("ETCD_BINARY_DIR")
  308. if p == "" {
  309. p = defaultInternalV1etcdBinaryDir
  310. }
  311. p = path.Join(p, "1")
  312. err := syscall.Exec(p, os.Args, syscall.Environ())
  313. if err != nil {
  314. log.Fatalf("starter: failed to execute internal v1 etcd: %v", err)
  315. }
  316. }
  317. func newDefaultClient(tls *TLSInfo) (*http.Client, error) {
  318. tr := &http.Transport{}
  319. if tls.Scheme() == "https" {
  320. tlsConfig, err := tls.ClientConfig()
  321. if err != nil {
  322. return nil, err
  323. }
  324. tr.TLSClientConfig = tlsConfig
  325. }
  326. return &http.Client{Transport: tr}, nil
  327. }
  328. type value struct {
  329. isBoolFlag bool
  330. s string
  331. }
  332. func (v *value) String() string { return v.s }
  333. func (v *value) Set(s string) error {
  334. v.s = s
  335. return nil
  336. }
  337. func (v *value) IsBoolFlag() bool { return v.isBoolFlag }
  338. type boolFlag interface {
  339. flag.Value
  340. IsBoolFlag() bool
  341. }
  342. // parseConfig parses out the input config from cmdline arguments and
  343. // environment variables.
  344. func parseConfig(args []string) (*flag.FlagSet, error) {
  345. fs := flag.NewFlagSet("full flagset", flag.ContinueOnError)
  346. etcdmain.NewConfig().VisitAll(func(f *flag.Flag) {
  347. _, isBoolFlag := f.Value.(boolFlag)
  348. fs.Var(&value{isBoolFlag: isBoolFlag}, f.Name, "")
  349. })
  350. if err := fs.Parse(args); err != nil {
  351. return nil, err
  352. }
  353. if err := flags.SetFlagsFromEnv(fs); err != nil {
  354. return nil, err
  355. }
  356. return fs, nil
  357. }
  358. func clientTLSInfo(fs *flag.FlagSet) *TLSInfo {
  359. return &TLSInfo{
  360. CAFile: fs.Lookup("ca-file").Value.String(),
  361. CertFile: fs.Lookup("cert-file").Value.String(),
  362. KeyFile: fs.Lookup("key-file").Value.String(),
  363. }
  364. }
  365. func peerTLSInfo(fs *flag.FlagSet) *TLSInfo {
  366. return &TLSInfo{
  367. CAFile: fs.Lookup("peer-ca-file").Value.String(),
  368. CertFile: fs.Lookup("peer-cert-file").Value.String(),
  369. KeyFile: fs.Lookup("peer-key-file").Value.String(),
  370. }
  371. }
  372. func snapDir4(dataDir string) string {
  373. return path.Join(dataDir, "snapshot")
  374. }
  375. func logFile4(dataDir string) string {
  376. return path.Join(dataDir, "log")
  377. }
  378. func standbyInfo4(dataDir string) string {
  379. return path.Join(dataDir, "standby_info")
  380. }
  381. func trimSplit(s, sep string) []string {
  382. trimmed := strings.Split(s, sep)
  383. for i := range trimmed {
  384. trimmed[i] = strings.TrimSpace(trimmed[i])
  385. }
  386. return trimmed
  387. }