member.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rpcpb
  15. import (
  16. "context"
  17. "crypto/tls"
  18. "fmt"
  19. "net/url"
  20. "os"
  21. "time"
  22. "go.etcd.io/etcd/clientv3"
  23. "go.etcd.io/etcd/clientv3/snapshot"
  24. pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
  25. "go.etcd.io/etcd/pkg/logutil"
  26. "go.etcd.io/etcd/pkg/transport"
  27. "github.com/dustin/go-humanize"
  28. "go.uber.org/zap"
  29. grpc "google.golang.org/grpc"
  30. "google.golang.org/grpc/credentials"
  31. )
  32. // ElectionTimeout returns an election timeout duration.
  33. func (m *Member) ElectionTimeout() time.Duration {
  34. return time.Duration(m.Etcd.ElectionTimeoutMs) * time.Millisecond
  35. }
  36. // DialEtcdGRPCServer creates a raw gRPC connection to an etcd member.
  37. func (m *Member) DialEtcdGRPCServer(opts ...grpc.DialOption) (*grpc.ClientConn, error) {
  38. dialOpts := []grpc.DialOption{
  39. grpc.WithTimeout(5 * time.Second),
  40. grpc.WithBlock(),
  41. }
  42. secure := false
  43. for _, cu := range m.Etcd.AdvertiseClientURLs {
  44. u, err := url.Parse(cu)
  45. if err != nil {
  46. return nil, err
  47. }
  48. if u.Scheme == "https" { // TODO: handle unix
  49. secure = true
  50. }
  51. }
  52. if secure {
  53. // assume save TLS assets are already stord on disk
  54. tlsInfo := transport.TLSInfo{
  55. CertFile: m.ClientCertPath,
  56. KeyFile: m.ClientKeyPath,
  57. TrustedCAFile: m.ClientTrustedCAPath,
  58. // TODO: remove this with generated certs
  59. // only need it for auto TLS
  60. InsecureSkipVerify: true,
  61. }
  62. tlsConfig, err := tlsInfo.ClientConfig()
  63. if err != nil {
  64. return nil, err
  65. }
  66. creds := credentials.NewTLS(tlsConfig)
  67. dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
  68. } else {
  69. dialOpts = append(dialOpts, grpc.WithInsecure())
  70. }
  71. dialOpts = append(dialOpts, opts...)
  72. return grpc.Dial(m.EtcdClientEndpoint, dialOpts...)
  73. }
  74. // CreateEtcdClientConfig creates a client configuration from member.
  75. func (m *Member) CreateEtcdClientConfig(opts ...grpc.DialOption) (cfg *clientv3.Config, err error) {
  76. secure := false
  77. for _, cu := range m.Etcd.AdvertiseClientURLs {
  78. var u *url.URL
  79. u, err = url.Parse(cu)
  80. if err != nil {
  81. return nil, err
  82. }
  83. if u.Scheme == "https" { // TODO: handle unix
  84. secure = true
  85. }
  86. }
  87. // TODO: make this configurable
  88. level := "error"
  89. if os.Getenv("ETCD_CLIENT_DEBUG") != "" {
  90. level = "debug"
  91. }
  92. lcfg := logutil.DefaultZapLoggerConfig
  93. lcfg.Level = zap.NewAtomicLevelAt(logutil.ConvertToZapLevel(level))
  94. cfg = &clientv3.Config{
  95. Endpoints: []string{m.EtcdClientEndpoint},
  96. DialTimeout: 10 * time.Second,
  97. DialOptions: opts,
  98. LogConfig: &lcfg,
  99. }
  100. if secure {
  101. // assume save TLS assets are already stord on disk
  102. tlsInfo := transport.TLSInfo{
  103. CertFile: m.ClientCertPath,
  104. KeyFile: m.ClientKeyPath,
  105. TrustedCAFile: m.ClientTrustedCAPath,
  106. // TODO: remove this with generated certs
  107. // only need it for auto TLS
  108. InsecureSkipVerify: true,
  109. }
  110. var tlsConfig *tls.Config
  111. tlsConfig, err = tlsInfo.ClientConfig()
  112. if err != nil {
  113. return nil, err
  114. }
  115. cfg.TLS = tlsConfig
  116. }
  117. return cfg, err
  118. }
  119. // CreateEtcdClient creates a client from member.
  120. func (m *Member) CreateEtcdClient(opts ...grpc.DialOption) (*clientv3.Client, error) {
  121. cfg, err := m.CreateEtcdClientConfig(opts...)
  122. if err != nil {
  123. return nil, err
  124. }
  125. return clientv3.New(*cfg)
  126. }
  127. // CheckCompact ensures that historical data before given revision has been compacted.
  128. func (m *Member) CheckCompact(rev int64) error {
  129. cli, err := m.CreateEtcdClient()
  130. if err != nil {
  131. return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  132. }
  133. defer cli.Close()
  134. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  135. wch := cli.Watch(ctx, "\x00", clientv3.WithFromKey(), clientv3.WithRev(rev-1))
  136. wr, ok := <-wch
  137. cancel()
  138. if !ok {
  139. return fmt.Errorf("watch channel terminated (endpoint %q)", m.EtcdClientEndpoint)
  140. }
  141. if wr.CompactRevision != rev {
  142. return fmt.Errorf("got compact revision %v, wanted %v (endpoint %q)", wr.CompactRevision, rev, m.EtcdClientEndpoint)
  143. }
  144. return nil
  145. }
  146. // Defrag runs defragmentation on this member.
  147. func (m *Member) Defrag() error {
  148. cli, err := m.CreateEtcdClient()
  149. if err != nil {
  150. return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  151. }
  152. defer cli.Close()
  153. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
  154. _, err = cli.Defragment(ctx, m.EtcdClientEndpoint)
  155. cancel()
  156. return err
  157. }
  158. // RevHash fetches current revision and hash on this member.
  159. func (m *Member) RevHash() (int64, int64, error) {
  160. conn, err := m.DialEtcdGRPCServer()
  161. if err != nil {
  162. return 0, 0, err
  163. }
  164. defer conn.Close()
  165. mt := pb.NewMaintenanceClient(conn)
  166. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  167. resp, err := mt.Hash(ctx, &pb.HashRequest{}, grpc.FailFast(false))
  168. cancel()
  169. if err != nil {
  170. return 0, 0, err
  171. }
  172. return resp.Header.Revision, int64(resp.Hash), nil
  173. }
  174. // Rev fetches current revision on this member.
  175. func (m *Member) Rev(ctx context.Context) (int64, error) {
  176. cli, err := m.CreateEtcdClient()
  177. if err != nil {
  178. return 0, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  179. }
  180. defer cli.Close()
  181. resp, err := cli.Status(ctx, m.EtcdClientEndpoint)
  182. if err != nil {
  183. return 0, err
  184. }
  185. return resp.Header.Revision, nil
  186. }
  187. // Compact compacts member storage with given revision.
  188. // It blocks until it's physically done.
  189. func (m *Member) Compact(rev int64, timeout time.Duration) error {
  190. cli, err := m.CreateEtcdClient()
  191. if err != nil {
  192. return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  193. }
  194. defer cli.Close()
  195. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  196. _, err = cli.Compact(ctx, rev, clientv3.WithCompactPhysical())
  197. cancel()
  198. return err
  199. }
  200. // IsLeader returns true if this member is the current cluster leader.
  201. func (m *Member) IsLeader() (bool, error) {
  202. cli, err := m.CreateEtcdClient()
  203. if err != nil {
  204. return false, fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  205. }
  206. defer cli.Close()
  207. resp, err := cli.Status(context.Background(), m.EtcdClientEndpoint)
  208. if err != nil {
  209. return false, err
  210. }
  211. return resp.Header.MemberId == resp.Leader, nil
  212. }
  213. // WriteHealthKey writes a health key to this member.
  214. func (m *Member) WriteHealthKey() error {
  215. cli, err := m.CreateEtcdClient()
  216. if err != nil {
  217. return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  218. }
  219. defer cli.Close()
  220. // give enough time-out in case expensive requests (range/delete) are pending
  221. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  222. _, err = cli.Put(ctx, "health", "good")
  223. cancel()
  224. if err != nil {
  225. return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  226. }
  227. return nil
  228. }
  229. // SaveSnapshot downloads a snapshot file from this member, locally.
  230. // It's meant to requested remotely, so that local member can store
  231. // snapshot file on local disk.
  232. func (m *Member) SaveSnapshot(lg *zap.Logger) (err error) {
  233. // remove existing snapshot first
  234. if err = os.RemoveAll(m.SnapshotPath); err != nil {
  235. return err
  236. }
  237. var ccfg *clientv3.Config
  238. ccfg, err = m.CreateEtcdClientConfig()
  239. if err != nil {
  240. return fmt.Errorf("%v (%q)", err, m.EtcdClientEndpoint)
  241. }
  242. lg.Info(
  243. "snapshot save START",
  244. zap.String("member-name", m.Etcd.Name),
  245. zap.Strings("member-client-urls", m.Etcd.AdvertiseClientURLs),
  246. zap.String("snapshot-path", m.SnapshotPath),
  247. )
  248. now := time.Now()
  249. mgr := snapshot.NewV3(lg)
  250. if err = mgr.Save(context.Background(), *ccfg, m.SnapshotPath); err != nil {
  251. return err
  252. }
  253. took := time.Since(now)
  254. var fi os.FileInfo
  255. fi, err = os.Stat(m.SnapshotPath)
  256. if err != nil {
  257. return err
  258. }
  259. var st snapshot.Status
  260. st, err = mgr.Status(m.SnapshotPath)
  261. if err != nil {
  262. return err
  263. }
  264. m.SnapshotInfo = &SnapshotInfo{
  265. MemberName: m.Etcd.Name,
  266. MemberClientURLs: m.Etcd.AdvertiseClientURLs,
  267. SnapshotPath: m.SnapshotPath,
  268. SnapshotFileSize: humanize.Bytes(uint64(fi.Size())),
  269. SnapshotTotalSize: humanize.Bytes(uint64(st.TotalSize)),
  270. SnapshotTotalKey: int64(st.TotalKey),
  271. SnapshotHash: int64(st.Hash),
  272. SnapshotRevision: st.Revision,
  273. Took: fmt.Sprintf("%v", took),
  274. }
  275. lg.Info(
  276. "snapshot save END",
  277. zap.String("member-name", m.SnapshotInfo.MemberName),
  278. zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs),
  279. zap.String("snapshot-path", m.SnapshotPath),
  280. zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize),
  281. zap.String("snapshot-total-size", m.SnapshotInfo.SnapshotTotalSize),
  282. zap.Int64("snapshot-total-key", m.SnapshotInfo.SnapshotTotalKey),
  283. zap.Int64("snapshot-hash", m.SnapshotInfo.SnapshotHash),
  284. zap.Int64("snapshot-revision", m.SnapshotInfo.SnapshotRevision),
  285. zap.String("took", m.SnapshotInfo.Took),
  286. )
  287. return nil
  288. }
  289. // RestoreSnapshot restores a cluster from a given snapshot file on disk.
  290. // It's meant to requested remotely, so that local member can load the
  291. // snapshot file from local disk.
  292. func (m *Member) RestoreSnapshot(lg *zap.Logger) (err error) {
  293. if err = os.RemoveAll(m.EtcdOnSnapshotRestore.DataDir); err != nil {
  294. return err
  295. }
  296. if err = os.RemoveAll(m.EtcdOnSnapshotRestore.WALDir); err != nil {
  297. return err
  298. }
  299. lg.Info(
  300. "snapshot restore START",
  301. zap.String("member-name", m.Etcd.Name),
  302. zap.Strings("member-client-urls", m.Etcd.AdvertiseClientURLs),
  303. zap.String("snapshot-path", m.SnapshotPath),
  304. )
  305. now := time.Now()
  306. mgr := snapshot.NewV3(lg)
  307. err = mgr.Restore(snapshot.RestoreConfig{
  308. SnapshotPath: m.SnapshotInfo.SnapshotPath,
  309. Name: m.EtcdOnSnapshotRestore.Name,
  310. OutputDataDir: m.EtcdOnSnapshotRestore.DataDir,
  311. OutputWALDir: m.EtcdOnSnapshotRestore.WALDir,
  312. PeerURLs: m.EtcdOnSnapshotRestore.AdvertisePeerURLs,
  313. InitialCluster: m.EtcdOnSnapshotRestore.InitialCluster,
  314. InitialClusterToken: m.EtcdOnSnapshotRestore.InitialClusterToken,
  315. SkipHashCheck: false,
  316. // TODO: set SkipHashCheck it true, to recover from existing db file
  317. })
  318. took := time.Since(now)
  319. lg.Info(
  320. "snapshot restore END",
  321. zap.String("member-name", m.SnapshotInfo.MemberName),
  322. zap.Strings("member-client-urls", m.SnapshotInfo.MemberClientURLs),
  323. zap.String("snapshot-path", m.SnapshotPath),
  324. zap.String("snapshot-file-size", m.SnapshotInfo.SnapshotFileSize),
  325. zap.String("snapshot-total-size", m.SnapshotInfo.SnapshotTotalSize),
  326. zap.Int64("snapshot-total-key", m.SnapshotInfo.SnapshotTotalKey),
  327. zap.Int64("snapshot-hash", m.SnapshotInfo.SnapshotHash),
  328. zap.Int64("snapshot-revision", m.SnapshotInfo.SnapshotRevision),
  329. zap.String("took", took.String()),
  330. zap.Error(err),
  331. )
  332. return err
  333. }