handler.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package agent
  15. import (
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "net/url"
  20. "os"
  21. "os/exec"
  22. "path/filepath"
  23. "syscall"
  24. "time"
  25. "github.com/coreos/etcd/embed"
  26. "github.com/coreos/etcd/functional/rpcpb"
  27. "github.com/coreos/etcd/pkg/fileutil"
  28. "github.com/coreos/etcd/pkg/proxy"
  29. "go.uber.org/zap"
  30. )
  31. // return error for system errors (e.g. fail to create files)
  32. // return status error in response for wrong configuration/operation (e.g. start etcd twice)
  33. func (srv *Server) handleTesterRequest(req *rpcpb.Request) (resp *rpcpb.Response, err error) {
  34. defer func() {
  35. if err == nil && req != nil {
  36. srv.last = req.Operation
  37. srv.lg.Info("handler success", zap.String("operation", req.Operation.String()))
  38. }
  39. }()
  40. if req != nil {
  41. srv.Member = req.Member
  42. srv.Tester = req.Tester
  43. }
  44. switch req.Operation {
  45. case rpcpb.Operation_INITIAL_START_ETCD:
  46. return srv.handle_INITIAL_START_ETCD(req)
  47. case rpcpb.Operation_RESTART_ETCD:
  48. return srv.handle_RESTART_ETCD()
  49. case rpcpb.Operation_SIGTERM_ETCD:
  50. return srv.handle_SIGTERM_ETCD()
  51. case rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA:
  52. return srv.handle_SIGQUIT_ETCD_AND_REMOVE_DATA()
  53. case rpcpb.Operation_SAVE_SNAPSHOT:
  54. return srv.handle_SAVE_SNAPSHOT()
  55. case rpcpb.Operation_RESTORE_RESTART_FROM_SNAPSHOT:
  56. return srv.handle_RESTORE_RESTART_FROM_SNAPSHOT()
  57. case rpcpb.Operation_RESTART_FROM_SNAPSHOT:
  58. return srv.handle_RESTART_FROM_SNAPSHOT()
  59. case rpcpb.Operation_SIGQUIT_ETCD_AND_ARCHIVE_DATA:
  60. return srv.handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA()
  61. case rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT:
  62. return srv.handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT()
  63. case rpcpb.Operation_BLACKHOLE_PEER_PORT_TX_RX:
  64. return srv.handle_BLACKHOLE_PEER_PORT_TX_RX()
  65. case rpcpb.Operation_UNBLACKHOLE_PEER_PORT_TX_RX:
  66. return srv.handle_UNBLACKHOLE_PEER_PORT_TX_RX()
  67. case rpcpb.Operation_DELAY_PEER_PORT_TX_RX:
  68. return srv.handle_DELAY_PEER_PORT_TX_RX()
  69. case rpcpb.Operation_UNDELAY_PEER_PORT_TX_RX:
  70. return srv.handle_UNDELAY_PEER_PORT_TX_RX()
  71. default:
  72. msg := fmt.Sprintf("operation not found (%v)", req.Operation)
  73. return &rpcpb.Response{Success: false, Status: msg}, errors.New(msg)
  74. }
  75. }
  76. func (srv *Server) handle_INITIAL_START_ETCD(req *rpcpb.Request) (*rpcpb.Response, error) {
  77. if srv.last != rpcpb.Operation_NOT_STARTED {
  78. return &rpcpb.Response{
  79. Success: false,
  80. Status: fmt.Sprintf("%q is not valid; last server operation was %q", rpcpb.Operation_INITIAL_START_ETCD.String(), srv.last.String()),
  81. Member: req.Member,
  82. }, nil
  83. }
  84. err := fileutil.TouchDirAll(srv.Member.BaseDir)
  85. if err != nil {
  86. return nil, err
  87. }
  88. srv.lg.Info("created base directory", zap.String("path", srv.Member.BaseDir))
  89. if srv.etcdServer == nil {
  90. if err = srv.createEtcdLogFile(); err != nil {
  91. return nil, err
  92. }
  93. }
  94. if err = srv.creatEtcd(false); err != nil {
  95. return nil, err
  96. }
  97. if err = srv.saveTLSAssets(); err != nil {
  98. return nil, err
  99. }
  100. if err = srv.startEtcd(); err != nil {
  101. return nil, err
  102. }
  103. if err = srv.loadAutoTLSAssets(); err != nil {
  104. return nil, err
  105. }
  106. // wait some time for etcd listener start
  107. // before setting up proxy
  108. time.Sleep(time.Second)
  109. if err = srv.startProxy(); err != nil {
  110. return nil, err
  111. }
  112. return &rpcpb.Response{
  113. Success: true,
  114. Status: "start etcd PASS",
  115. Member: srv.Member,
  116. }, nil
  117. }
  118. func (srv *Server) startProxy() error {
  119. if srv.Member.EtcdClientProxy {
  120. advertiseClientURL, advertiseClientURLPort, err := getURLAndPort(srv.Member.Etcd.AdvertiseClientURLs[0])
  121. if err != nil {
  122. return err
  123. }
  124. listenClientURL, _, err := getURLAndPort(srv.Member.Etcd.ListenClientURLs[0])
  125. if err != nil {
  126. return err
  127. }
  128. srv.advertiseClientPortToProxy[advertiseClientURLPort] = proxy.NewServer(proxy.ServerConfig{
  129. Logger: srv.lg,
  130. From: *advertiseClientURL,
  131. To: *listenClientURL,
  132. })
  133. select {
  134. case err = <-srv.advertiseClientPortToProxy[advertiseClientURLPort].Error():
  135. return err
  136. case <-time.After(2 * time.Second):
  137. srv.lg.Info("started proxy on client traffic", zap.String("url", advertiseClientURL.String()))
  138. }
  139. }
  140. if srv.Member.EtcdPeerProxy {
  141. advertisePeerURL, advertisePeerURLPort, err := getURLAndPort(srv.Member.Etcd.AdvertisePeerURLs[0])
  142. if err != nil {
  143. return err
  144. }
  145. listenPeerURL, _, err := getURLAndPort(srv.Member.Etcd.ListenPeerURLs[0])
  146. if err != nil {
  147. return err
  148. }
  149. srv.advertisePeerPortToProxy[advertisePeerURLPort] = proxy.NewServer(proxy.ServerConfig{
  150. Logger: srv.lg,
  151. From: *advertisePeerURL,
  152. To: *listenPeerURL,
  153. })
  154. select {
  155. case err = <-srv.advertisePeerPortToProxy[advertisePeerURLPort].Error():
  156. return err
  157. case <-time.After(2 * time.Second):
  158. srv.lg.Info("started proxy on peer traffic", zap.String("url", advertisePeerURL.String()))
  159. }
  160. }
  161. return nil
  162. }
  163. func (srv *Server) stopProxy() {
  164. if srv.Member.EtcdClientProxy && len(srv.advertiseClientPortToProxy) > 0 {
  165. for port, px := range srv.advertiseClientPortToProxy {
  166. if err := px.Close(); err != nil {
  167. srv.lg.Warn("failed to close proxy", zap.Int("port", port))
  168. continue
  169. }
  170. select {
  171. case <-px.Done():
  172. // enough time to release port
  173. time.Sleep(time.Second)
  174. case <-time.After(time.Second):
  175. }
  176. srv.lg.Info("closed proxy",
  177. zap.Int("port", port),
  178. zap.String("from", px.From()),
  179. zap.String("to", px.To()),
  180. )
  181. }
  182. srv.advertiseClientPortToProxy = make(map[int]proxy.Server)
  183. }
  184. if srv.Member.EtcdPeerProxy && len(srv.advertisePeerPortToProxy) > 0 {
  185. for port, px := range srv.advertisePeerPortToProxy {
  186. if err := px.Close(); err != nil {
  187. srv.lg.Warn("failed to close proxy", zap.Int("port", port))
  188. continue
  189. }
  190. select {
  191. case <-px.Done():
  192. // enough time to release port
  193. time.Sleep(time.Second)
  194. case <-time.After(time.Second):
  195. }
  196. srv.lg.Info("closed proxy",
  197. zap.Int("port", port),
  198. zap.String("from", px.From()),
  199. zap.String("to", px.To()),
  200. )
  201. }
  202. srv.advertisePeerPortToProxy = make(map[int]proxy.Server)
  203. }
  204. }
  205. func (srv *Server) createEtcdLogFile() error {
  206. var err error
  207. srv.etcdLogFile, err = os.Create(srv.Member.Etcd.LogOutput)
  208. if err != nil {
  209. return err
  210. }
  211. srv.lg.Info("created etcd log file", zap.String("path", srv.Member.Etcd.LogOutput))
  212. return nil
  213. }
  214. func (srv *Server) creatEtcd(fromSnapshot bool) error {
  215. if !fileutil.Exist(srv.Member.EtcdExec) || srv.Member.EtcdExec != "embed" {
  216. return fmt.Errorf("unknown etcd exec %q or path does not exist", srv.Member.EtcdExec)
  217. }
  218. if fileutil.Exist(srv.Member.EtcdExec) && srv.Member.EtcdExec != "embed" {
  219. etcdPath, etcdFlags := srv.Member.EtcdExec, srv.Member.Etcd.Flags()
  220. if fromSnapshot {
  221. etcdFlags = srv.Member.EtcdOnSnapshotRestore.Flags()
  222. }
  223. u, _ := url.Parse(srv.Member.FailpointHTTPAddr)
  224. srv.lg.Info("creating etcd command",
  225. zap.String("etcd-exec", etcdPath),
  226. zap.Strings("etcd-flags", etcdFlags),
  227. zap.String("failpoint-http-addr", srv.Member.FailpointHTTPAddr),
  228. zap.String("failpoint-addr", u.Host),
  229. )
  230. srv.etcdCmd = exec.Command(etcdPath, etcdFlags...)
  231. srv.etcdCmd.Env = []string{"GOFAIL_HTTP=" + u.Host}
  232. srv.etcdCmd.Stdout = srv.etcdLogFile
  233. srv.etcdCmd.Stderr = srv.etcdLogFile
  234. } else if srv.Member.EtcdExec == "embed" {
  235. cfg, err := srv.Member.Etcd.EmbedConfig()
  236. if err != nil {
  237. return err
  238. }
  239. srv.etcdServer, err = embed.StartEtcd(cfg)
  240. if err != nil {
  241. return err
  242. }
  243. // TODO: set up logging
  244. }
  245. return nil
  246. }
  247. // if started with manual TLS, stores TLS assets
  248. // from tester/client to disk before starting etcd process
  249. func (srv *Server) saveTLSAssets() error {
  250. if srv.Member.PeerCertPath != "" {
  251. if srv.Member.PeerCertData == "" {
  252. return fmt.Errorf("got empty data for %q", srv.Member.PeerCertPath)
  253. }
  254. if err := ioutil.WriteFile(srv.Member.PeerCertPath, []byte(srv.Member.PeerCertData), 0644); err != nil {
  255. return err
  256. }
  257. }
  258. if srv.Member.PeerKeyPath != "" {
  259. if srv.Member.PeerKeyData == "" {
  260. return fmt.Errorf("got empty data for %q", srv.Member.PeerKeyPath)
  261. }
  262. if err := ioutil.WriteFile(srv.Member.PeerKeyPath, []byte(srv.Member.PeerKeyData), 0644); err != nil {
  263. return err
  264. }
  265. }
  266. if srv.Member.PeerTrustedCAPath != "" {
  267. if srv.Member.PeerTrustedCAData == "" {
  268. return fmt.Errorf("got empty data for %q", srv.Member.PeerTrustedCAPath)
  269. }
  270. if err := ioutil.WriteFile(srv.Member.PeerTrustedCAPath, []byte(srv.Member.PeerTrustedCAData), 0644); err != nil {
  271. return err
  272. }
  273. }
  274. if srv.Member.PeerCertPath != "" &&
  275. srv.Member.PeerKeyPath != "" &&
  276. srv.Member.PeerTrustedCAPath != "" {
  277. srv.lg.Info(
  278. "wrote",
  279. zap.String("peer-cert", srv.Member.PeerCertPath),
  280. zap.String("peer-key", srv.Member.PeerKeyPath),
  281. zap.String("peer-trusted-ca", srv.Member.PeerTrustedCAPath),
  282. )
  283. }
  284. if srv.Member.ClientCertPath != "" {
  285. if srv.Member.ClientCertData == "" {
  286. return fmt.Errorf("got empty data for %q", srv.Member.ClientCertPath)
  287. }
  288. if err := ioutil.WriteFile(srv.Member.ClientCertPath, []byte(srv.Member.ClientCertData), 0644); err != nil {
  289. return err
  290. }
  291. }
  292. if srv.Member.ClientKeyPath != "" {
  293. if srv.Member.ClientKeyData == "" {
  294. return fmt.Errorf("got empty data for %q", srv.Member.ClientKeyPath)
  295. }
  296. if err := ioutil.WriteFile(srv.Member.ClientKeyPath, []byte(srv.Member.ClientKeyData), 0644); err != nil {
  297. return err
  298. }
  299. }
  300. if srv.Member.ClientTrustedCAPath != "" {
  301. if srv.Member.ClientTrustedCAData == "" {
  302. return fmt.Errorf("got empty data for %q", srv.Member.ClientTrustedCAPath)
  303. }
  304. if err := ioutil.WriteFile(srv.Member.ClientTrustedCAPath, []byte(srv.Member.ClientTrustedCAData), 0644); err != nil {
  305. return err
  306. }
  307. }
  308. if srv.Member.ClientCertPath != "" &&
  309. srv.Member.ClientKeyPath != "" &&
  310. srv.Member.ClientTrustedCAPath != "" {
  311. srv.lg.Info(
  312. "wrote",
  313. zap.String("client-cert", srv.Member.ClientCertPath),
  314. zap.String("client-key", srv.Member.ClientKeyPath),
  315. zap.String("client-trusted-ca", srv.Member.ClientTrustedCAPath),
  316. )
  317. }
  318. return nil
  319. }
  320. func (srv *Server) loadAutoTLSAssets() error {
  321. if srv.Member.Etcd.PeerAutoTLS {
  322. // in case of slow disk
  323. time.Sleep(time.Second)
  324. fdir := filepath.Join(srv.Member.Etcd.DataDir, "fixtures", "peer")
  325. srv.lg.Info(
  326. "loading client auto TLS assets",
  327. zap.String("dir", fdir),
  328. zap.String("endpoint", srv.EtcdClientEndpoint),
  329. )
  330. certPath := filepath.Join(fdir, "cert.pem")
  331. if !fileutil.Exist(certPath) {
  332. return fmt.Errorf("cannot find %q", certPath)
  333. }
  334. certData, err := ioutil.ReadFile(certPath)
  335. if err != nil {
  336. return fmt.Errorf("cannot read %q (%v)", certPath, err)
  337. }
  338. srv.Member.PeerCertData = string(certData)
  339. keyPath := filepath.Join(fdir, "key.pem")
  340. if !fileutil.Exist(keyPath) {
  341. return fmt.Errorf("cannot find %q", keyPath)
  342. }
  343. keyData, err := ioutil.ReadFile(keyPath)
  344. if err != nil {
  345. return fmt.Errorf("cannot read %q (%v)", keyPath, err)
  346. }
  347. srv.Member.PeerKeyData = string(keyData)
  348. srv.lg.Info(
  349. "loaded peer auto TLS assets",
  350. zap.String("peer-cert-path", certPath),
  351. zap.Int("peer-cert-length", len(certData)),
  352. zap.String("peer-key-path", keyPath),
  353. zap.Int("peer-key-length", len(keyData)),
  354. )
  355. }
  356. if srv.Member.Etcd.ClientAutoTLS {
  357. // in case of slow disk
  358. time.Sleep(time.Second)
  359. fdir := filepath.Join(srv.Member.Etcd.DataDir, "fixtures", "client")
  360. srv.lg.Info(
  361. "loading client TLS assets",
  362. zap.String("dir", fdir),
  363. zap.String("endpoint", srv.EtcdClientEndpoint),
  364. )
  365. certPath := filepath.Join(fdir, "cert.pem")
  366. if !fileutil.Exist(certPath) {
  367. return fmt.Errorf("cannot find %q", certPath)
  368. }
  369. certData, err := ioutil.ReadFile(certPath)
  370. if err != nil {
  371. return fmt.Errorf("cannot read %q (%v)", certPath, err)
  372. }
  373. srv.Member.ClientCertData = string(certData)
  374. keyPath := filepath.Join(fdir, "key.pem")
  375. if !fileutil.Exist(keyPath) {
  376. return fmt.Errorf("cannot find %q", keyPath)
  377. }
  378. keyData, err := ioutil.ReadFile(keyPath)
  379. if err != nil {
  380. return fmt.Errorf("cannot read %q (%v)", keyPath, err)
  381. }
  382. srv.Member.ClientKeyData = string(keyData)
  383. srv.lg.Info(
  384. "loaded client TLS assets",
  385. zap.String("peer-cert-path", certPath),
  386. zap.Int("peer-cert-length", len(certData)),
  387. zap.String("peer-key-path", keyPath),
  388. zap.Int("peer-key-length", len(keyData)),
  389. )
  390. }
  391. return nil
  392. }
  393. // start but do not wait for it to complete
  394. func (srv *Server) startEtcd() error {
  395. if srv.etcdCmd != nil {
  396. srv.lg.Info(
  397. "started etcd",
  398. zap.String("command-path", srv.etcdCmd.Path),
  399. )
  400. return srv.etcdCmd.Start()
  401. }
  402. select {
  403. case <-srv.etcdServer.Server.ReadyNotify():
  404. srv.lg.Info("started embedded etcd")
  405. case <-time.After(time.Minute):
  406. srv.etcdServer.Close()
  407. return fmt.Errorf("took too long to start %v", <-srv.etcdServer.Err())
  408. }
  409. return nil
  410. }
  411. func (srv *Server) handle_RESTART_ETCD() (*rpcpb.Response, error) {
  412. var err error
  413. if !fileutil.Exist(srv.Member.BaseDir) {
  414. err = fileutil.TouchDirAll(srv.Member.BaseDir)
  415. if err != nil {
  416. return nil, err
  417. }
  418. }
  419. if err = srv.creatEtcd(false); err != nil {
  420. return nil, err
  421. }
  422. if err = srv.saveTLSAssets(); err != nil {
  423. return nil, err
  424. }
  425. if err = srv.startEtcd(); err != nil {
  426. return nil, err
  427. }
  428. if err = srv.loadAutoTLSAssets(); err != nil {
  429. return nil, err
  430. }
  431. // wait some time for etcd listener start
  432. // before setting up proxy
  433. // TODO: local tests should handle port conflicts
  434. // with clients on restart
  435. time.Sleep(time.Second)
  436. if err = srv.startProxy(); err != nil {
  437. return nil, err
  438. }
  439. return &rpcpb.Response{
  440. Success: true,
  441. Status: "restart etcd PASS",
  442. Member: srv.Member,
  443. }, nil
  444. }
  445. func (srv *Server) handle_SIGTERM_ETCD() (*rpcpb.Response, error) {
  446. srv.stopProxy()
  447. err := stopWithSig(srv.etcdCmd, syscall.SIGTERM)
  448. if err != nil {
  449. return nil, err
  450. }
  451. srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGTERM.String()))
  452. return &rpcpb.Response{
  453. Success: true,
  454. Status: "killed etcd",
  455. }, nil
  456. }
  457. func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA() (*rpcpb.Response, error) {
  458. srv.stopProxy()
  459. err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
  460. if err != nil {
  461. return nil, err
  462. }
  463. srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
  464. if srv.etcdServer != nil {
  465. srv.etcdServer.GetLogger().Sync()
  466. } else {
  467. srv.etcdLogFile.Sync()
  468. srv.etcdLogFile.Close()
  469. }
  470. // for debugging purposes, rename instead of removing
  471. if err = os.RemoveAll(srv.Member.BaseDir + ".backup"); err != nil {
  472. return nil, err
  473. }
  474. if err = os.Rename(srv.Member.BaseDir, srv.Member.BaseDir+".backup"); err != nil {
  475. return nil, err
  476. }
  477. srv.lg.Info(
  478. "renamed",
  479. zap.String("base-dir", srv.Member.BaseDir),
  480. zap.String("new-dir", srv.Member.BaseDir+".backup"),
  481. )
  482. // create a new log file for next new member restart
  483. if !fileutil.Exist(srv.Member.BaseDir) {
  484. err = fileutil.TouchDirAll(srv.Member.BaseDir)
  485. if err != nil {
  486. return nil, err
  487. }
  488. }
  489. return &rpcpb.Response{
  490. Success: true,
  491. Status: "killed etcd and removed base directory",
  492. }, nil
  493. }
  494. func (srv *Server) handle_SAVE_SNAPSHOT() (*rpcpb.Response, error) {
  495. err := srv.Member.SaveSnapshot(srv.lg)
  496. if err != nil {
  497. return nil, err
  498. }
  499. return &rpcpb.Response{
  500. Success: true,
  501. Status: "saved snapshot",
  502. SnapshotInfo: srv.Member.SnapshotInfo,
  503. }, nil
  504. }
  505. func (srv *Server) handle_RESTORE_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err error) {
  506. err = srv.Member.RestoreSnapshot(srv.lg)
  507. if err != nil {
  508. return nil, err
  509. }
  510. resp, err = srv.handle_RESTART_FROM_SNAPSHOT()
  511. if resp != nil && err == nil {
  512. resp.Status = "restored snapshot and " + resp.Status
  513. }
  514. return resp, err
  515. }
  516. func (srv *Server) handle_RESTART_FROM_SNAPSHOT() (resp *rpcpb.Response, err error) {
  517. if err = srv.creatEtcd(true); err != nil {
  518. return nil, err
  519. }
  520. if err = srv.saveTLSAssets(); err != nil {
  521. return nil, err
  522. }
  523. if err = srv.startEtcd(); err != nil {
  524. return nil, err
  525. }
  526. if err = srv.loadAutoTLSAssets(); err != nil {
  527. return nil, err
  528. }
  529. // wait some time for etcd listener start
  530. // before setting up proxy
  531. // TODO: local tests should handle port conflicts
  532. // with clients on restart
  533. time.Sleep(time.Second)
  534. if err = srv.startProxy(); err != nil {
  535. return nil, err
  536. }
  537. return &rpcpb.Response{
  538. Success: true,
  539. Status: "restarted etcd from snapshot",
  540. SnapshotInfo: srv.Member.SnapshotInfo,
  541. }, nil
  542. }
  543. func (srv *Server) handle_SIGQUIT_ETCD_AND_ARCHIVE_DATA() (*rpcpb.Response, error) {
  544. srv.stopProxy()
  545. // exit with stackstrace
  546. err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
  547. if err != nil {
  548. return nil, err
  549. }
  550. srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
  551. if srv.etcdServer != nil {
  552. srv.etcdServer.GetLogger().Sync()
  553. } else {
  554. srv.etcdLogFile.Sync()
  555. srv.etcdLogFile.Close()
  556. }
  557. // TODO: support separate WAL directory
  558. if err = archive(
  559. srv.Member.BaseDir,
  560. srv.Member.Etcd.LogOutput,
  561. srv.Member.Etcd.DataDir,
  562. ); err != nil {
  563. return nil, err
  564. }
  565. srv.lg.Info("archived data", zap.String("base-dir", srv.Member.BaseDir))
  566. if srv.etcdServer == nil {
  567. if err = srv.createEtcdLogFile(); err != nil {
  568. return nil, err
  569. }
  570. }
  571. srv.lg.Info("cleaning up page cache")
  572. if err := cleanPageCache(); err != nil {
  573. srv.lg.Warn("failed to clean up page cache", zap.String("error", err.Error()))
  574. }
  575. srv.lg.Info("cleaned up page cache")
  576. return &rpcpb.Response{
  577. Success: true,
  578. Status: "cleaned up etcd",
  579. }, nil
  580. }
  581. // stop proxy, etcd, delete data directory
  582. func (srv *Server) handle_SIGQUIT_ETCD_AND_REMOVE_DATA_AND_STOP_AGENT() (*rpcpb.Response, error) {
  583. srv.stopProxy()
  584. err := stopWithSig(srv.etcdCmd, syscall.SIGQUIT)
  585. if err != nil {
  586. return nil, err
  587. }
  588. srv.lg.Info("killed etcd", zap.String("signal", syscall.SIGQUIT.String()))
  589. if srv.etcdServer != nil {
  590. srv.etcdServer.GetLogger().Sync()
  591. } else {
  592. srv.etcdLogFile.Sync()
  593. srv.etcdLogFile.Close()
  594. }
  595. err = os.RemoveAll(srv.Member.BaseDir)
  596. if err != nil {
  597. return nil, err
  598. }
  599. srv.lg.Info("removed base directory", zap.String("dir", srv.Member.BaseDir))
  600. // stop agent server
  601. srv.Stop()
  602. return &rpcpb.Response{
  603. Success: true,
  604. Status: "destroyed etcd and agent",
  605. }, nil
  606. }
  607. func (srv *Server) handle_BLACKHOLE_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
  608. for port, px := range srv.advertisePeerPortToProxy {
  609. srv.lg.Info("blackholing", zap.Int("peer-port", port))
  610. px.BlackholeTx()
  611. px.BlackholeRx()
  612. srv.lg.Info("blackholed", zap.Int("peer-port", port))
  613. }
  614. return &rpcpb.Response{
  615. Success: true,
  616. Status: "blackholed peer port tx/rx",
  617. }, nil
  618. }
  619. func (srv *Server) handle_UNBLACKHOLE_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
  620. for port, px := range srv.advertisePeerPortToProxy {
  621. srv.lg.Info("unblackholing", zap.Int("peer-port", port))
  622. px.UnblackholeTx()
  623. px.UnblackholeRx()
  624. srv.lg.Info("unblackholed", zap.Int("peer-port", port))
  625. }
  626. return &rpcpb.Response{
  627. Success: true,
  628. Status: "unblackholed peer port tx/rx",
  629. }, nil
  630. }
  631. func (srv *Server) handle_DELAY_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
  632. lat := time.Duration(srv.Tester.UpdatedDelayLatencyMs) * time.Millisecond
  633. rv := time.Duration(srv.Tester.DelayLatencyMsRv) * time.Millisecond
  634. for port, px := range srv.advertisePeerPortToProxy {
  635. srv.lg.Info("delaying",
  636. zap.Int("peer-port", port),
  637. zap.Duration("latency", lat),
  638. zap.Duration("random-variable", rv),
  639. )
  640. px.DelayTx(lat, rv)
  641. px.DelayRx(lat, rv)
  642. srv.lg.Info("delayed",
  643. zap.Int("peer-port", port),
  644. zap.Duration("latency", lat),
  645. zap.Duration("random-variable", rv),
  646. )
  647. }
  648. return &rpcpb.Response{
  649. Success: true,
  650. Status: "delayed peer port tx/rx",
  651. }, nil
  652. }
  653. func (srv *Server) handle_UNDELAY_PEER_PORT_TX_RX() (*rpcpb.Response, error) {
  654. for port, px := range srv.advertisePeerPortToProxy {
  655. srv.lg.Info("undelaying", zap.Int("peer-port", port))
  656. px.UndelayTx()
  657. px.UndelayRx()
  658. srv.lg.Info("undelayed", zap.Int("peer-port", port))
  659. }
  660. return &rpcpb.Response{
  661. Success: true,
  662. Status: "undelayed peer port tx/rx",
  663. }, nil
  664. }