http.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdhttp
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "log"
  20. "net/http"
  21. "net/url"
  22. "path"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  27. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  28. etcdErr "github.com/coreos/etcd/error"
  29. "github.com/coreos/etcd/etcdserver"
  30. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  31. "github.com/coreos/etcd/pkg/types"
  32. "github.com/coreos/etcd/raft/raftpb"
  33. "github.com/coreos/etcd/store"
  34. "github.com/coreos/etcd/version"
  35. )
  36. const (
  37. // prefixes of client endpoint
  38. keysPrefix = "/v2/keys"
  39. deprecatedMachinesPrefix = "/v2/machines"
  40. adminMembersPrefix = "/v2/admin/members/"
  41. statsPrefix = "/v2/stats"
  42. versionPrefix = "/version"
  43. // prefixes of peer endpoint
  44. raftPrefix = "/raft"
  45. membersPrefix = "/members"
  46. // time to wait for response from EtcdServer requests
  47. defaultServerTimeout = 500 * time.Millisecond
  48. // time to wait for a Watch request
  49. defaultWatchTimeout = 5 * time.Minute
  50. )
  51. var errClosed = errors.New("etcdhttp: client closed connection")
  52. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  53. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
  54. sh := &serverHandler{
  55. server: server,
  56. clusterInfo: server.Cluster,
  57. stats: server,
  58. timer: server,
  59. timeout: defaultServerTimeout,
  60. clock: clockwork.NewRealClock(),
  61. }
  62. mux := http.NewServeMux()
  63. mux.HandleFunc(keysPrefix, sh.serveKeys)
  64. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  65. mux.HandleFunc(statsPrefix+"/store", sh.serveStoreStats)
  66. mux.HandleFunc(statsPrefix+"/self", sh.serveSelfStats)
  67. mux.HandleFunc(statsPrefix+"/leader", sh.serveLeaderStats)
  68. mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines)
  69. mux.HandleFunc(adminMembersPrefix, sh.serveAdminMembers)
  70. mux.HandleFunc(versionPrefix, sh.serveVersion)
  71. mux.HandleFunc("/", http.NotFound)
  72. return mux
  73. }
  74. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  75. func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
  76. sh := &serverHandler{
  77. server: server,
  78. stats: server,
  79. clusterInfo: server.Cluster,
  80. clock: clockwork.NewRealClock(),
  81. }
  82. mux := http.NewServeMux()
  83. mux.HandleFunc(raftPrefix, sh.serveRaft)
  84. mux.HandleFunc(membersPrefix, sh.serveMembers)
  85. mux.HandleFunc("/", http.NotFound)
  86. return mux
  87. }
  88. // serverHandler provides http.Handlers for etcd client and raft communication.
  89. type serverHandler struct {
  90. timeout time.Duration
  91. server etcdserver.Server
  92. stats etcdserver.Stats
  93. timer etcdserver.RaftTimer
  94. clusterInfo etcdserver.ClusterInfo
  95. clock clockwork.Clock
  96. }
  97. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  98. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  99. return
  100. }
  101. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  102. defer cancel()
  103. rr, err := parseKeyRequest(r, etcdserver.GenID(), clockwork.NewRealClock())
  104. if err != nil {
  105. writeError(w, err)
  106. return
  107. }
  108. resp, err := h.server.Do(ctx, rr)
  109. if err != nil {
  110. err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix)
  111. writeError(w, err)
  112. return
  113. }
  114. switch {
  115. case resp.Event != nil:
  116. if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
  117. // Should never be reached
  118. log.Printf("error writing event: %v", err)
  119. }
  120. case resp.Watcher != nil:
  121. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  122. defer cancel()
  123. handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  124. default:
  125. writeError(w, errors.New("received response with no Event/Watcher!"))
  126. }
  127. }
  128. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  129. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  130. if !allowMethod(w, r.Method, "GET", "HEAD") {
  131. return
  132. }
  133. endpoints := h.clusterInfo.ClientURLs()
  134. w.Write([]byte(strings.Join(endpoints, ", ")))
  135. }
  136. func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) {
  137. if !allowMethod(w, r.Method, "GET", "POST", "DELETE") {
  138. return
  139. }
  140. ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
  141. defer cancel()
  142. switch r.Method {
  143. case "GET":
  144. if s := strings.TrimPrefix(r.URL.Path, adminMembersPrefix); s != "" {
  145. http.NotFound(w, r)
  146. return
  147. }
  148. ms := struct {
  149. Members []*etcdserver.Member `json:"members"`
  150. }{
  151. Members: h.clusterInfo.Members(),
  152. }
  153. w.Header().Set("Content-Type", "application/json")
  154. if err := json.NewEncoder(w).Encode(ms); err != nil {
  155. log.Printf("etcdhttp: %v", err)
  156. }
  157. case "POST":
  158. ctype := r.Header.Get("Content-Type")
  159. if ctype != "application/json" {
  160. http.Error(w, fmt.Sprintf("bad Content-Type %s, accept application/json", ctype), http.StatusBadRequest)
  161. return
  162. }
  163. b, err := ioutil.ReadAll(r.Body)
  164. if err != nil {
  165. http.Error(w, err.Error(), http.StatusBadRequest)
  166. return
  167. }
  168. raftAttr := etcdserver.RaftAttributes{}
  169. if err := json.Unmarshal(b, &raftAttr); err != nil {
  170. http.Error(w, err.Error(), http.StatusBadRequest)
  171. return
  172. }
  173. validURLs, err := types.NewURLs(raftAttr.PeerURLs)
  174. if err != nil {
  175. http.Error(w, "bad peer urls", http.StatusBadRequest)
  176. return
  177. }
  178. now := h.clock.Now()
  179. m := etcdserver.NewMember("", validURLs, "", &now)
  180. if err := h.server.AddMember(ctx, *m); err != nil {
  181. log.Printf("etcdhttp: error adding node %x: %v", m.ID, err)
  182. writeError(w, err)
  183. return
  184. }
  185. log.Printf("etcdhttp: added node %x with peer urls %v", m.ID, raftAttr.PeerURLs)
  186. w.Header().Set("Content-Type", "application/json")
  187. w.WriteHeader(http.StatusCreated)
  188. if err := json.NewEncoder(w).Encode(m); err != nil {
  189. log.Printf("etcdhttp: %v", err)
  190. }
  191. case "DELETE":
  192. idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix)
  193. id, err := strconv.ParseUint(idStr, 16, 64)
  194. if err != nil {
  195. http.Error(w, err.Error(), http.StatusBadRequest)
  196. return
  197. }
  198. log.Printf("etcdhttp: remove node %x", id)
  199. if err := h.server.RemoveMember(ctx, id); err != nil {
  200. log.Printf("etcdhttp: error removing node %x: %v", id, err)
  201. writeError(w, err)
  202. return
  203. }
  204. w.WriteHeader(http.StatusNoContent)
  205. }
  206. }
  207. func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) {
  208. if !allowMethod(w, r.Method, "GET") {
  209. return
  210. }
  211. w.Header().Set("Content-Type", "application/json")
  212. w.Write(h.stats.StoreStats())
  213. }
  214. func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) {
  215. if !allowMethod(w, r.Method, "GET") {
  216. return
  217. }
  218. w.Header().Set("Content-Type", "application/json")
  219. w.Write(h.stats.SelfStats())
  220. }
  221. func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) {
  222. if !allowMethod(w, r.Method, "GET") {
  223. return
  224. }
  225. w.Header().Set("Content-Type", "application/json")
  226. w.Write(h.stats.LeaderStats())
  227. }
  228. func (h serverHandler) serveVersion(w http.ResponseWriter, r *http.Request) {
  229. if !allowMethod(w, r.Method, "GET") {
  230. return
  231. }
  232. w.Write([]byte("etcd " + version.Version))
  233. }
  234. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  235. if !allowMethod(w, r.Method, "POST") {
  236. return
  237. }
  238. wcid := strconv.FormatUint(h.clusterInfo.ID(), 16)
  239. w.Header().Set("X-Etcd-Cluster-ID", wcid)
  240. gcid := r.Header.Get("X-Etcd-Cluster-ID")
  241. if gcid != wcid {
  242. log.Printf("etcdhttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid)
  243. http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed)
  244. return
  245. }
  246. b, err := ioutil.ReadAll(r.Body)
  247. if err != nil {
  248. log.Println("etcdhttp: error reading raft message:", err)
  249. http.Error(w, "error reading raft message", http.StatusBadRequest)
  250. return
  251. }
  252. var m raftpb.Message
  253. if err := m.Unmarshal(b); err != nil {
  254. log.Println("etcdhttp: error unmarshaling raft message:", err)
  255. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  256. return
  257. }
  258. if err := h.server.Process(context.TODO(), m); err != nil {
  259. log.Println("etcdhttp: error processing raft message:", err)
  260. switch err {
  261. case etcdserver.ErrRemoved:
  262. http.Error(w, "cannot process message from removed node", http.StatusForbidden)
  263. default:
  264. writeError(w, err)
  265. }
  266. return
  267. }
  268. if m.Type == raftpb.MsgApp {
  269. h.stats.UpdateRecvApp(m.From, r.ContentLength)
  270. }
  271. w.WriteHeader(http.StatusNoContent)
  272. }
  273. func (h serverHandler) serveMembers(w http.ResponseWriter, r *http.Request) {
  274. if !allowMethod(w, r.Method, "GET") {
  275. return
  276. }
  277. cid := strconv.FormatUint(h.clusterInfo.ID(), 16)
  278. w.Header().Set("X-Etcd-Cluster-ID", cid)
  279. if r.URL.Path != membersPrefix {
  280. http.Error(w, "bad path", http.StatusBadRequest)
  281. return
  282. }
  283. ms := h.clusterInfo.Members()
  284. w.Header().Set("Content-Type", "application/json")
  285. if err := json.NewEncoder(w).Encode(ms); err != nil {
  286. log.Printf("etcdhttp: %v", err)
  287. }
  288. }
  289. // parseKeyRequest converts a received http.Request on keysPrefix to
  290. // a server Request, performing validation of supplied fields as appropriate.
  291. // If any validation fails, an empty Request and non-nil error is returned.
  292. func parseKeyRequest(r *http.Request, id uint64, clock clockwork.Clock) (etcdserverpb.Request, error) {
  293. emptyReq := etcdserverpb.Request{}
  294. err := r.ParseForm()
  295. if err != nil {
  296. return emptyReq, etcdErr.NewRequestError(
  297. etcdErr.EcodeInvalidForm,
  298. err.Error(),
  299. )
  300. }
  301. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  302. return emptyReq, etcdErr.NewRequestError(
  303. etcdErr.EcodeInvalidForm,
  304. "incorrect key prefix",
  305. )
  306. }
  307. p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):])
  308. var pIdx, wIdx uint64
  309. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  310. return emptyReq, etcdErr.NewRequestError(
  311. etcdErr.EcodeIndexNaN,
  312. `invalid value for "prevIndex"`,
  313. )
  314. }
  315. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  316. return emptyReq, etcdErr.NewRequestError(
  317. etcdErr.EcodeIndexNaN,
  318. `invalid value for "waitIndex"`,
  319. )
  320. }
  321. var rec, sort, wait, dir, quorum, stream bool
  322. if rec, err = getBool(r.Form, "recursive"); err != nil {
  323. return emptyReq, etcdErr.NewRequestError(
  324. etcdErr.EcodeInvalidField,
  325. `invalid value for "recursive"`,
  326. )
  327. }
  328. if sort, err = getBool(r.Form, "sorted"); err != nil {
  329. return emptyReq, etcdErr.NewRequestError(
  330. etcdErr.EcodeInvalidField,
  331. `invalid value for "sorted"`,
  332. )
  333. }
  334. if wait, err = getBool(r.Form, "wait"); err != nil {
  335. return emptyReq, etcdErr.NewRequestError(
  336. etcdErr.EcodeInvalidField,
  337. `invalid value for "wait"`,
  338. )
  339. }
  340. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  341. if dir, err = getBool(r.Form, "dir"); err != nil {
  342. return emptyReq, etcdErr.NewRequestError(
  343. etcdErr.EcodeInvalidField,
  344. `invalid value for "dir"`,
  345. )
  346. }
  347. if quorum, err = getBool(r.Form, "quorum"); err != nil {
  348. return emptyReq, etcdErr.NewRequestError(
  349. etcdErr.EcodeInvalidField,
  350. `invalid value for "quorum"`,
  351. )
  352. }
  353. if stream, err = getBool(r.Form, "stream"); err != nil {
  354. return emptyReq, etcdErr.NewRequestError(
  355. etcdErr.EcodeInvalidField,
  356. `invalid value for "stream"`,
  357. )
  358. }
  359. if wait && r.Method != "GET" {
  360. return emptyReq, etcdErr.NewRequestError(
  361. etcdErr.EcodeInvalidField,
  362. `"wait" can only be used with GET requests`,
  363. )
  364. }
  365. pV := r.FormValue("prevValue")
  366. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  367. return emptyReq, etcdErr.NewRequestError(
  368. etcdErr.EcodeInvalidField,
  369. `"prevValue" cannot be empty`,
  370. )
  371. }
  372. // TTL is nullable, so leave it null if not specified
  373. // or an empty string
  374. var ttl *uint64
  375. if len(r.FormValue("ttl")) > 0 {
  376. i, err := getUint64(r.Form, "ttl")
  377. if err != nil {
  378. return emptyReq, etcdErr.NewRequestError(
  379. etcdErr.EcodeTTLNaN,
  380. `invalid value for "ttl"`,
  381. )
  382. }
  383. ttl = &i
  384. }
  385. // prevExist is nullable, so leave it null if not specified
  386. var pe *bool
  387. if _, ok := r.Form["prevExist"]; ok {
  388. bv, err := getBool(r.Form, "prevExist")
  389. if err != nil {
  390. return emptyReq, etcdErr.NewRequestError(
  391. etcdErr.EcodeInvalidField,
  392. "invalid value for prevExist",
  393. )
  394. }
  395. pe = &bv
  396. }
  397. rr := etcdserverpb.Request{
  398. ID: id,
  399. Method: r.Method,
  400. Path: p,
  401. Val: r.FormValue("value"),
  402. Dir: dir,
  403. PrevValue: pV,
  404. PrevIndex: pIdx,
  405. PrevExist: pe,
  406. Wait: wait,
  407. Since: wIdx,
  408. Recursive: rec,
  409. Sorted: sort,
  410. Quorum: quorum,
  411. Stream: stream,
  412. }
  413. if pe != nil {
  414. rr.PrevExist = pe
  415. }
  416. // Null TTL is equivalent to unset Expiration
  417. if ttl != nil {
  418. expr := time.Duration(*ttl) * time.Second
  419. rr.Expiration = clock.Now().Add(expr).UnixNano()
  420. }
  421. return rr, nil
  422. }
  423. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  424. // not exist in the form, 0 is returned. If the key exists but the value is
  425. // badly formed, an error is returned. If multiple values are present only the
  426. // first is considered.
  427. func getUint64(form url.Values, key string) (i uint64, err error) {
  428. if vals, ok := form[key]; ok {
  429. i, err = strconv.ParseUint(vals[0], 10, 64)
  430. }
  431. return
  432. }
  433. // getBool extracts a bool by the given key from a Form. If the key does not
  434. // exist in the form, false is returned. If the key exists but the value is
  435. // badly formed, an error is returned. If multiple values are present only the
  436. // first is considered.
  437. func getBool(form url.Values, key string) (b bool, err error) {
  438. if vals, ok := form[key]; ok {
  439. b, err = strconv.ParseBool(vals[0])
  440. }
  441. return
  442. }
  443. // writeError logs and writes the given Error to the ResponseWriter
  444. // If Error is an etcdErr, it is rendered to the ResponseWriter
  445. // Otherwise, it is assumed to be an InternalServerError
  446. func writeError(w http.ResponseWriter, err error) {
  447. if err == nil {
  448. return
  449. }
  450. log.Println(err)
  451. if e, ok := err.(*etcdErr.Error); ok {
  452. e.Write(w)
  453. } else {
  454. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  455. }
  456. }
  457. // writeKeyEvent trims the prefix of key path in a single Event under
  458. // StoreKeysPrefix, serializes it and writes the resulting JSON to the given
  459. // ResponseWriter, along with the appropriate headers.
  460. func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  461. if ev == nil {
  462. return errors.New("cannot write empty Event!")
  463. }
  464. w.Header().Set("Content-Type", "application/json")
  465. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  466. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  467. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  468. if ev.IsCreated() {
  469. w.WriteHeader(http.StatusCreated)
  470. }
  471. ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
  472. return json.NewEncoder(w).Encode(ev)
  473. }
  474. func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  475. defer wa.Remove()
  476. ech := wa.EventChan()
  477. var nch <-chan bool
  478. if x, ok := w.(http.CloseNotifier); ok {
  479. nch = x.CloseNotify()
  480. }
  481. w.Header().Set("Content-Type", "application/json")
  482. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  483. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  484. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  485. w.WriteHeader(http.StatusOK)
  486. // Ensure headers are flushed early, in case of long polling
  487. w.(http.Flusher).Flush()
  488. for {
  489. select {
  490. case <-nch:
  491. // Client closed connection. Nothing to do.
  492. return
  493. case <-ctx.Done():
  494. // Timed out. net/http will close the connection for us, so nothing to do.
  495. return
  496. case ev, ok := <-ech:
  497. if !ok {
  498. // If the channel is closed this may be an indication of
  499. // that notifications are much more than we are able to
  500. // send to the client in time. Then we simply end streaming.
  501. return
  502. }
  503. ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
  504. if err := json.NewEncoder(w).Encode(ev); err != nil {
  505. // Should never be reached
  506. log.Printf("error writing event: %v\n", err)
  507. return
  508. }
  509. if !stream {
  510. return
  511. }
  512. w.(http.Flusher).Flush()
  513. }
  514. }
  515. }
  516. // allowMethod verifies that the given method is one of the allowed methods,
  517. // and if not, it writes an error to w. A boolean is returned indicating
  518. // whether or not the method is allowed.
  519. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  520. for _, meth := range ms {
  521. if m == meth {
  522. return true
  523. }
  524. }
  525. w.Header().Set("Allow", strings.Join(ms, ","))
  526. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  527. return false
  528. }
  529. func trimEventPrefix(ev *store.Event, prefix string) *store.Event {
  530. if ev == nil {
  531. return nil
  532. }
  533. ev.Node = trimNodeExternPrefix(ev.Node, prefix)
  534. ev.PrevNode = trimNodeExternPrefix(ev.PrevNode, prefix)
  535. return ev
  536. }
  537. func trimNodeExternPrefix(n *store.NodeExtern, prefix string) *store.NodeExtern {
  538. if n == nil {
  539. return nil
  540. }
  541. n.Key = strings.TrimPrefix(n.Key, prefix)
  542. for _, nn := range n.Nodes {
  543. nn = trimNodeExternPrefix(nn, prefix)
  544. }
  545. return n
  546. }
  547. func trimErrorPrefix(err error, prefix string) error {
  548. if e, ok := err.(*etcdErr.Error); ok {
  549. e.Cause = strings.TrimPrefix(e.Cause, prefix)
  550. }
  551. return err
  552. }